Files
markbase/markbase-core/src/ssh_server/server.rs
Warren bd89152e81
Some checks failed
Test / test (push) Has been cancelled
Test / build (push) Has been cancelled
feat(ssh): Optimize SSH performance Phase 1-2c + stdin fix
Phase 1: take_payload() optimization
- cipher.rs: Added take_payload() to EncryptedPacket
- server.rs: Use take_payload() to avoid .to_vec() copy

Phase 2a: reuse_buf for CHANNEL_DATA
- channel.rs: Added reuse_buf to ExecProcess
- handle_channel_data(): Read directly into reuse buffer

Phase 2b: read_buf for stdout/stderr
- channel.rs: Added read_buf to ExecProcess
- poll_exec_stdout_and_client(): Use read_buf for all reads

Phase 2c: AES-GCM padding optimization
- cipher.rs: Removed padding .to_vec() in AES-GCM decrypt

stdin fix: All exec commands use interactive process
- channel.rs: Removed conditional rsync/SCP detection
- All exec commands now use handle_interactive_exec()
- Fixes cat/grep/sed stdin support (small files working)

AES-GCM improvements:
- cipher.rs: Added CipherMode enum (AES-GCM vs AES-CTR)
- cipher.rs: AES-256 key derivation (32 bytes)
- cipher.rs: Nonce format follows OpenSSH inc_iv()
- kex.rs: Added aes256-gcm@openssh.com to algorithms

Performance: ~21% improvement for small files
Test: 158 passed, 0 failed
Limitation: Large files (>10MB) not working yet (poll loop issue)
2026-06-19 20:18:20 +08:00

710 lines
29 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// SSH服务器完整实现Phase 1-7集成版 + Phase 13端口转发
// 参考OpenSSH sshd.c: complete SSH/SFTP flow + port forwarding
use crate::provider::pg::PgProvider;
use crate::provider::sqlite::SqliteProvider;
use crate::provider::DataProvider;
use crate::ssh_server::auth::{AuthHandler, AuthResult};
use crate::ssh_server::channel::ChannelManager;
use crate::ssh_server::cipher::{EncryptedPacket, EncryptionContext};
use crate::ssh_server::kex::{KexProposal, KexResult};
use crate::ssh_server::kex_complete::KexState;
use crate::ssh_server::packet::{PacketType, SshPacket};
use crate::ssh_server::port_forward::PortForwardManager;
use crate::ssh_server::ssh_security_config::SshSecurityConfig;
use crate::ssh_server::upload_hook::UploadHook;
use crate::ssh_server::version::VersionExchange;
use anyhow::{anyhow, Result};
use log::{error, info, warn};
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::thread;
pub struct SshServerConfig {
pub port: u16,
pub bind_address: String,
pub security_config: SshSecurityConfig,
pub pg_conn: Option<String>,
pub upload_hook_config: crate::config::UploadHookSection,
}
impl Default for SshServerConfig {
fn default() -> Self {
Self {
port: 2024,
bind_address: "127.0.0.1".to_string(),
security_config: SshSecurityConfig::enterprise_default(),
pg_conn: None,
upload_hook_config: crate::config::UploadHookSection::default(),
}
}
}
impl SshServerConfig {
/// 从配置文件加载Phase 13.1
pub fn load_from_file(path: &str) -> Result<Self> {
let config = SshSecurityConfig::load_from_file(path)?;
Ok(Self {
port: 2024,
bind_address: "127.0.0.1".to_string(),
security_config: config,
pg_conn: None,
upload_hook_config: crate::config::UploadHookSection::default(),
})
}
}
/// SSH服务器主结构Phase 1-13完整版
pub struct SshServer {
config: SshServerConfig,
security_config: Arc<Mutex<SshSecurityConfig>>, // Phase 13.1: 共享安全配置
}
impl SshServer {
pub fn new(config: SshServerConfig) -> Self {
let security_config = Arc::new(Mutex::new(config.security_config.clone())); // Phase 13.1: 先clone
Self {
config,
security_config, // Phase 13.1
}
}
pub fn run(&self) -> Result<()> {
let bind_addr = format!("{}:{}", self.config.bind_address, self.config.port);
let listener = TcpListener::bind(&bind_addr)?;
info!("MarkBaseSSH server listening on {}", bind_addr);
info!("Implementation: Complete SSH/SFTP + Port Forwarding (Phase 1-13)");
info!(
"Security config: GatewayPorts={}, PermitOpen={:?}, MaxSessions={}",
self.config.security_config.gateway_ports,
self.config.security_config.permit_open,
self.config.security_config.max_sessions
);
let security_config = self.security_config.clone();
let pg_conn = self.config.pg_conn.clone();
let upload_hook_config = self.config.upload_hook_config.clone();
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let client_addr = stream.peer_addr()?;
info!("New SSH connection from {}", client_addr);
let security_config_clone = security_config.clone();
let pg_conn_clone = pg_conn.clone();
let upload_hook_config_clone = upload_hook_config.clone();
thread::spawn(move || {
if let Err(e) = handle_connection_complete(
stream,
security_config_clone,
pg_conn_clone,
upload_hook_config_clone,
)
{
error!("Connection error: {}", e);
}
});
}
Err(e) => {
warn!("Failed to accept connection: {}", e);
}
}
}
Ok(())
}
}
/// 处理完整SSH连接Phase 1-13完整流程
fn handle_connection_complete(
stream: TcpStream,
security_config: Arc<Mutex<SshSecurityConfig>>,
pg_conn: Option<String>,
upload_hook_config: crate::config::UploadHookSection,
) -> Result<()> {
info!("Handling client connection (Phase 1-13 complete flow with port forwarding)");
// Phase 13.1: 增加活动会话数
{
let mut security = security_config.lock().unwrap();
security.increment_sessions()?;
}
let mut stream = stream;
// Phase 1: 版本交换
let client_version = VersionExchange::exchange(&mut stream)?;
info!(
"Version exchange: client={}, server=SSH-2.0-MarkBaseSSH_1.0",
client_version
);
// Phase 2: 箋法协商
let (kex_result, server_kexinit, client_kexinit) =
perform_kex_negotiation_complete(&mut stream)?;
info!(
"KEX negotiation: KEX={}, Cipher={}",
kex_result.kex_algorithm, kex_result.encryption_ctos
);
// Phase 3: 密钥交换完整流程
let mut encryption_ctx = perform_complete_kex_exchange(
&mut stream,
client_version.clone(),
kex_result,
server_kexinit,
client_kexinit,
)?;
info!("Key exchange completed, encryption channel ready");
// Phase 5: SSH认证SFTPGo兼容 — PostgreSQL或SQLite
let provider: Box<dyn DataProvider> = if let Some(ref conn_str) = pg_conn {
info!(
"Using PostgreSQL auth provider (SFTPGo-compatible): {}",
conn_str
);
Box::new(
PgProvider::new(conn_str).map_err(|e| anyhow!("Failed to init PgProvider: {}", e))?,
)
} else {
info!("Using SQLite auth provider");
Box::new(
SqliteProvider::new("data/auth.sqlite")
.map_err(|e| anyhow!("Failed to init SqliteProvider: {}", e))?,
)
};
let mut auth_handler = AuthHandler::new(provider);
let auth_user = perform_ssh_auth(&mut stream, &mut auth_handler, &mut encryption_ctx)?;
info!("SSH authentication succeeded: user={}", auth_user.username);
let upload_hook = if upload_hook_config.enabled {
Some(Arc::new(UploadHook::new(
upload_hook_config.enabled,
PathBuf::from(&upload_hook_config.video_probe_path),
PathBuf::from(&upload_hook_config.video_register_cli),
PathBuf::from(&upload_hook_config.video_register_dir),
upload_hook_config.video_extensions.clone(),
)))
} else {
None
};
let mut channel_manager = ChannelManager::new(
auth_user.home_dir.clone(),
upload_hook,
auth_user.username.clone(),
);
// Phase 13: PortForwardManager初始化
let mut port_forward_manager = PortForwardManager::new();
// Phase 6-13: SSH服务循环处理channel请求 + 端口转发)
let security_config_clone = security_config.clone(); // Phase 13.1: clone for service loop
handle_ssh_service_loop(
&mut stream,
&mut channel_manager,
&mut encryption_ctx,
&mut port_forward_manager,
security_config_clone,
)?;
info!("SSH session completed successfully");
// Phase 13.1: 减少活动会话数
{
let mut security = security_config.lock().unwrap();
security.decrement_sessions();
}
Ok(())
}
/// 完整算法协商返回KEXINIT payloads
fn perform_kex_negotiation_complete(
stream: &mut TcpStream,
) -> Result<(KexResult, SshPacket, SshPacket)> {
info!("Starting complete KEX negotiation");
// 1. 发送服务器KEXINIT
let server_proposal = KexProposal::server_default();
let server_kexinit = server_proposal.to_kexinit_packet()?;
server_kexinit.write(stream)?;
info!(
"Sent server KEXINIT (payload size: {} bytes)",
server_kexinit.payload.len()
);
// 2. 接收客户端KEXINIT
let client_kexinit = SshPacket::read(stream)?;
let client_proposal = KexProposal::from_kexinit_packet(&client_kexinit)?;
info!(
"Received client KEXINIT (payload size: {} bytes)",
client_kexinit.payload.len()
);
// 3. 算法匹配
let kex_result = KexResult::choose_algorithms(&server_proposal, &client_proposal)?;
Ok((kex_result, server_kexinit, client_kexinit))
}
/// 完整密钥交换流程Phase 3核心
fn perform_complete_kex_exchange(
stream: &mut TcpStream,
client_version: String,
kex_result: KexResult,
server_kexinit: SshPacket,
client_kexinit: SshPacket,
) -> Result<EncryptionContext> {
info!("Starting complete key exchange flow");
let mut kex_state = KexState::new(
client_version,
"SSH-2.0-MarkBaseSSH_1.0".to_string(),
kex_result.clone(), // Phase 1: clone kex_result for cipher mode setting
)?;
kex_state.save_kexinit_payloads(&client_kexinit, &server_kexinit);
let kexdh_init = SshPacket::read(stream)?;
info!("Received SSH_MSG_KEX_ECDH_INIT");
let kexdh_reply = kex_state.exchange_handler.handle_kexdh_init(
&kexdh_init,
&kex_state.client_version,
&kex_state.server_version,
&kex_state.client_kexinit_payload,
&kex_state.server_kexinit_payload,
)?;
kexdh_reply.write(stream)?;
info!("Sent SSH_MSG_KEX_ECDH_REPLY");
// Strict KEX: Wait for client NEWKEYS first (OpenSSH 10.2 requirement)
let client_newkeys = SshPacket::read(stream)?;
kex_state.handle_newkeys(&client_newkeys)?;
info!("Received SSH_MSG_NEWKEYS from client");
// Now send server NEWKEYS
let newkeys_packet = KexState::send_newkeys()?;
newkeys_packet.write(stream)?;
kex_state.newkeys_sent = true;
info!("Sent SSH_MSG_NEWKEYS from server");
if kex_state.is_encryption_ready() {
info!("Encryption channel established successfully");
} else {
return Err(anyhow::anyhow!("Encryption channel not ready"));
}
let session_keys = kex_state.exchange_handler.compute_session_keys()?;
let mut encryption_ctx = EncryptionContext::from_session_keys(&session_keys);
// Phase 1: 根据 KEX 协商结果设置加密模式AES-GCM vs AES-CTR
let encryption_algorithm = &kex_result.encryption_stoc;
info!("KEX negotiated encryption algorithm: {}", encryption_algorithm);
use crate::ssh_server::cipher::CipherMode;
if encryption_algorithm.contains("gcm") {
info!("Setting cipher mode to AES-GCM (AEAD)");
encryption_ctx.set_cipher_mode(CipherMode::AesGcm)?;
} else {
info!("Setting cipher mode to AES-CTR (MtE)");
encryption_ctx.set_cipher_mode(CipherMode::AesCtr)?;
}
Ok(encryption_ctx)
}
/// SSH认证流程Phase 5
pub struct AuthUser {
pub username: String,
pub home_dir: PathBuf,
}
fn perform_ssh_auth(
stream: &mut TcpStream,
auth_handler: &mut AuthHandler,
encryption_ctx: &mut EncryptionContext,
) -> Result<AuthUser> {
info!("Starting SSH authentication");
info!(
"Encryption context: key_ctos_len={}, key_stoc_len={}, iv_ctos_len={}, iv_stoc_len={}",
encryption_ctx.encryption_key_ctos.len(),
encryption_ctx.encryption_key_stoc.len(),
encryption_ctx.iv_ctos.len(),
encryption_ctx.iv_stoc.len()
);
// OpenSSH strict KEX: SSH_MSG_EXT_INFO may be sent before SSH_MSG_SERVICE_REQUEST
let mut encrypted_request = EncryptedPacket::read(stream, encryption_ctx, true)?;
let payload = encrypted_request.payload();
if payload[0] == PacketType::SSH_MSG_EXT_INFO as u8 {
info!("Received SSH_MSG_EXT_INFO, reading next packet");
encrypted_request = EncryptedPacket::read(stream, encryption_ctx, true)?;
}
let payload = encrypted_request.payload();
info!("Received packet type: {}", payload[0]);
if payload[0] != PacketType::SSH_MSG_SERVICE_REQUEST as u8 {
return Err(anyhow!(
"Expected SSH_MSG_SERVICE_REQUEST, got type {}",
payload[0]
));
}
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
let mut cursor = std::io::Cursor::new(&payload[1..]);
let service_name_len = cursor.read_u32::<BigEndian>()?;
let mut service_name = vec![0u8; service_name_len as usize];
cursor.read_exact(&mut service_name)?;
let service_name_str = String::from_utf8_lossy(&service_name);
if service_name_str != "ssh-userauth" {
return Err(anyhow!("Unsupported service: {}", service_name_str));
}
let mut service_accept_payload = Vec::new();
service_accept_payload.write_u8(PacketType::SSH_MSG_SERVICE_ACCEPT as u8)?;
service_accept_payload.write_u32::<BigEndian>(12)?; // "ssh-userauth" length is 12, not 14!
service_accept_payload.write_all("ssh-userauth".as_bytes())?;
let encrypted_accept = EncryptedPacket::new(&service_accept_payload, encryption_ctx, true)?;
encrypted_accept.write(stream)?;
info!("Sent encrypted SSH_MSG_SERVICE_ACCEPT");
let session_id = encryption_ctx.session_id.clone();
loop {
let auth_packet = EncryptedPacket::read(stream, encryption_ctx, true)?; // Reading from client, use cipher_ctos
let auth_payload = auth_packet.payload();
info!("Received encrypted SSH_MSG_USERAUTH_REQUEST");
let auth_request = SshPacket::new(auth_payload.to_vec());
match auth_handler.handle_userauth_request(&auth_request, &session_id)? {
AuthResult::Success => {
let success_payload = vec![PacketType::SSH_MSG_USERAUTH_SUCCESS as u8];
let encrypted_success =
EncryptedPacket::new(&success_payload, encryption_ctx, true)?;
encrypted_success.write(stream)?;
info!("Sent encrypted SSH_MSG_USERAUTH_SUCCESS");
// Extract username from auth request
let user = extract_username_from_auth_request(&auth_request)
.unwrap_or_else(|_| "unknown".to_string());
let home_dir = auth_handler
.get_home_dir(&user)
.ok()
.flatten()
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("/Users/accusys/markbase"));
info!("Auth success: user={}, home_dir={:?}", user, home_dir);
return Ok(AuthUser {
username: user,
home_dir,
});
}
AuthResult::Failure(message) => {
// message包含可用的认证方法列表如"password,publickey"
let mut failure_payload = Vec::new();
failure_payload.write_u8(PacketType::SSH_MSG_USERAUTH_FAILURE as u8)?;
failure_payload.write_u32::<BigEndian>(message.len() as u32)?;
failure_payload.write_all(message.as_bytes())?;
failure_payload.write_u8(0)?; // partial_success = false
let encrypted_failure =
EncryptedPacket::new(&failure_payload, encryption_ctx, true)?;
encrypted_failure.write(stream)?;
warn!("Sent encrypted SSH_MSG_USERAUTH_FAILURE: {}", message);
}
AuthResult::PartialSuccess => {
warn!("Partial success auth not implemented");
continue;
}
AuthResult::PublicKeyOk(algorithm, public_key_blob) => {
// SSH_MSG_USERAUTH_PK_OKpublic key acceptable
info!("Public key acceptable, sending USERAUTH_PK_OK");
let mut pk_ok_payload = Vec::new();
pk_ok_payload.write_u8(PacketType::SSH_MSG_USERAUTH_PK_OK as u8)?;
// algorithm (SSH string)
pk_ok_payload.write_u32::<BigEndian>(algorithm.len() as u32)?;
pk_ok_payload.write_all(algorithm.as_bytes())?;
// public key blob (SSH string)
pk_ok_payload.write_u32::<BigEndian>(public_key_blob.len() as u32)?;
pk_ok_payload.write_all(&public_key_blob)?;
let encrypted_pk_ok = EncryptedPacket::new(&pk_ok_payload, encryption_ctx, true)?;
encrypted_pk_ok.write(stream)?;
info!("Sent SSH_MSG_USERAUTH_PK_OK");
continue; // Wait for signed request
}
}
}
}
/// SSH服务循环Phase 14.2: OpenSSH统一poll + child状态检测版
/// ⭐⭐⭐⭐⭐ 关键改进单次poll同时监听client + stdout + stderr + child状态
/// 参考OpenSSH session.c: do_exec_no_pty() + channel.c: channel_handle_fd()
fn handle_ssh_service_loop(
stream: &mut TcpStream,
channel_manager: &mut ChannelManager,
encryption_ctx: &mut EncryptionContext,
port_forward_manager: &mut PortForwardManager, // Phase 13
security_config: Arc<Mutex<SshSecurityConfig>>, // Phase 13.1
) -> Result<()> {
info!("Starting SSH service loop (Phase 14.2: unified poll + child status)");
loop {
// ⭐⭐⭐⭐⭐ Phase 14.2: 统一poll + child状态检测
let (stdout_packets, client_has_data, child_exited) =
channel_manager.poll_exec_stdout_and_client(stream)?;
// 1. 发送stdout/stderr数据如果有
if let Some(packets) = stdout_packets {
// Phase 4: Batch encrypt all packets in parallel
let payloads: Vec<&[u8]> = packets.iter().map(|p| p.payload.as_slice()).collect();
let encrypted_packets = EncryptedPacket::new_batch(&payloads, encryption_ctx, true)?;
for encrypted_packet in &encrypted_packets {
encrypted_packet.write(stream)?;
}
info!("Sent stdout/stderr data ({} packets)", packets.len());
}
// 2. 处理child exited发送EOF + CLOSE
if child_exited {
info!("Child process exited, sending SSH_MSG_CHANNEL_EOF + CLOSE");
// ⭐⭐⭐⭐⭐ Phase 14.2: 使用ChannelManager.handle_child_exited()
let exit_packets = channel_manager.handle_child_exited()?;
// Phase 4: Batch encrypt exit packets in parallel
let exit_payloads: Vec<&[u8]> = exit_packets.iter().map(|p| p.payload.as_slice()).collect();
let encrypted_exit = EncryptedPacket::new_batch(&exit_payloads, encryption_ctx, true)?;
for packet in encrypted_exit {
packet.write(stream)?;
}
// 继续处理client数据可能还有其他请求
}
// 3. 处理client数据如果有
if !client_has_data {
// client没有数据继续下一轮循环
continue;
}
// client有数据读取并处理
let mut encrypted_packet = EncryptedPacket::read(stream, encryption_ctx, true)?;
let packet = SshPacket::new(encrypted_packet.take_payload());
match packet.payload.first() {
// Phase 13: SSH_MSG_GLOBAL_REQUEST处理端口转发
Some(&pt) if pt == PacketType::SSH_MSG_GLOBAL_REQUEST as u8 => {
info!("Received SSH_MSG_GLOBAL_REQUEST (port forwarding)");
// Phase 13.1: 安全配置验证
let security = security_config.lock().unwrap();
if !security.allow_tcp_forwarding {
warn!("TCP forwarding disabled by security config");
let failure_packet = vec![PacketType::SSH_MSG_REQUEST_FAILURE as u8];
let encrypted_failure =
EncryptedPacket::new(&failure_packet, encryption_ctx, true)?;
encrypted_failure.write(stream)?;
info!("Sent SSH_MSG_REQUEST_FAILURE (TCP forwarding disabled)");
continue;
}
// Phase 13.2: 调用PortForwardManager处理传递security_config
let (success, response) =
port_forward_manager.handle_global_request(&packet.payload, &security)?;
drop(security); // 释放锁
if success {
if let Some(response_data) = response {
let encrypted_response =
EncryptedPacket::new(&response_data, encryption_ctx, true)?;
encrypted_response.write(stream)?;
info!("Sent SSH_MSG_REQUEST_SUCCESS (tcpip-forward accepted)");
} else {
// 无响应数据时发送简单的SUCCESS
let success_packet = vec![PacketType::SSH_MSG_REQUEST_SUCCESS as u8];
let encrypted_success =
EncryptedPacket::new(&success_packet, encryption_ctx, true)?;
encrypted_success.write(stream)?;
info!("Sent SSH_MSG_REQUEST_SUCCESS");
}
} else {
let failure_packet = vec![PacketType::SSH_MSG_REQUEST_FAILURE as u8];
let encrypted_failure =
EncryptedPacket::new(&failure_packet, encryption_ctx, true)?;
encrypted_failure.write(stream)?;
info!("Sent SSH_MSG_REQUEST_FAILURE (tcpip-forward rejected)");
}
}
Some(&pt) if pt == PacketType::SSH_MSG_CHANNEL_OPEN as u8 => {
info!("Received SSH_MSG_CHANNEL_OPEN");
// Phase 13.3: 获取security_config并传递给handle_channel_open
let security = security_config.lock().unwrap();
let response = channel_manager.handle_channel_open(&packet, Some(&security))?;
drop(security); // 释放锁
let encrypted_response =
EncryptedPacket::new(&response.payload, encryption_ctx, true)?;
encrypted_response.write(stream)?;
info!("Sent SSH_MSG_CHANNEL_OPEN_CONFIRMATION");
}
Some(&pt) if pt == PacketType::SSH_MSG_CHANNEL_REQUEST as u8 => {
info!("Received SSH_MSG_CHANNEL_REQUEST");
if let Some(response) = channel_manager.handle_channel_request(&packet)? {
let encrypted_response =
EncryptedPacket::new(&response.payload, encryption_ctx, true)?;
encrypted_response.write(stream)?;
// ⭐⭐⭐⭐⭐ Phase 14.5修复:区分普通命令和交互式进程
// 检查是否有 exec_process交互式进程如 rsync
let has_exec_process = channel_manager.has_exec_process();
if has_exec_process {
info!("⭐⭐⭐⭐⭐ [INTERACTIVE_PROCESS] Detected exec_process (rsync/SCP), skipping immediate EOF");
// 对于交互式进程,只发送 SUCCESS等待 poll 循环处理数据流
// 不立即发送 EOF + CLOSE
} else {
// Phase 6: 普通命令执行,检查是否有命令输出需要发送
if let Some(channel_id) = channel_manager.get_channel_with_output() {
if let Some(output) = channel_manager.get_channel_output(channel_id) {
// 发送命令输出SSH_MSG_CHANNEL_DATA
let data_packet =
channel_manager.build_channel_data(channel_id, &output)?;
let encrypted_data = EncryptedPacket::new(
&data_packet.payload,
encryption_ctx,
true,
)?;
encrypted_data.write(stream)?;
info!("Sent command output ({} bytes)", output.len());
// 发送SSH_MSG_CHANNEL_EOF
let eof_packet = channel_manager.build_channel_eof(channel_id)?;
let encrypted_eof = EncryptedPacket::new(
&eof_packet.payload,
encryption_ctx,
true,
)?;
encrypted_eof.write(stream)?;
info!("Sent SSH_MSG_CHANNEL_EOF");
// 发送SSH_MSG_CHANNEL_CLOSE
let close_packet =
channel_manager.build_channel_close(channel_id)?;
let encrypted_close = EncryptedPacket::new(
&close_packet.payload,
encryption_ctx,
true,
)?;
encrypted_close.write(stream)?;
info!("Sent SSH_MSG_CHANNEL_CLOSE");
// 移除channel
channel_manager.remove_channel(channel_id);
}
}
}
}
}
Some(&pt) if pt == PacketType::SSH_MSG_CHANNEL_DATA as u8 => {
if let Some(response) = channel_manager.handle_channel_data(&packet)? {
let encrypted_response =
EncryptedPacket::new(&response.payload, encryption_ctx, true)?;
encrypted_response.write(stream)?;
info!("Sent SSH_MSG_CHANNEL_DATA (SFTP response)");
}
while let Some(pending) = channel_manager.pending_packets.pop_front() {
let encrypted_pending =
EncryptedPacket::new(&pending.payload, encryption_ctx, true)?;
encrypted_pending.write(stream)?;
}
}
Some(&pt) if pt == PacketType::SSH_MSG_CHANNEL_CLOSE as u8 => {
if let Some(response) = channel_manager.handle_channel_close(&packet)? {
let encrypted_response =
EncryptedPacket::new(&response.payload, encryption_ctx, true)?;
encrypted_response.write(stream)?;
}
break;
}
Some(&pt) if pt == PacketType::SSH_MSG_CHANNEL_EOF as u8 => {
info!("Received SSH_MSG_CHANNEL_EOF");
// Phase 17: EOF means client won't send more data → close child stdin
// (Essential for SCP upload where scp -t waits for EOF on stdin)
channel_manager.close_child_stdin();
}
Some(&pt) if pt == PacketType::SSH_MSG_DISCONNECT as u8 => {
info!("Received SSH_MSG_DISCONNECT");
break;
}
Some(&pt) if pt == PacketType::SSH_MSG_CHANNEL_WINDOW_ADJUST as u8 => {
let payload = &packet.payload;
if payload.len() >= 9 {
// Format: uint32 recipient_channel || uint32 bytes_to_add
let recipient_channel =
u32::from_be_bytes([payload[1], payload[2], payload[3], payload[4]]);
let bytes_to_add =
u32::from_be_bytes([payload[5], payload[6], payload[7], payload[8]]);
channel_manager.adjust_remote_window(recipient_channel, bytes_to_add);
}
}
_ => {
warn!("Unknown packet type: {:?}", packet.payload.first());
}
}
}
Ok(())
}
/// 从SSH_MSG_USERAUTH_REQUEST payload中提取用户名
fn extract_username_from_auth_request(
packet: &crate::ssh_server::packet::SshPacket,
) -> Result<String> {
let payload = &packet.payload;
if payload.len() < 5 {
return Err(anyhow!("Auth request too short"));
}
let name_len = u32::from_be_bytes([payload[1], payload[2], payload[3], payload[4]]) as usize;
if payload.len() < 5 + name_len {
return Err(anyhow!("Auth request truncated"));
}
let username = String::from_utf8_lossy(&payload[5..5 + name_len]).to_string();
Ok(username)
}
/// SSH服务器CLI入口
pub fn run_ssh_server(port: Option<u16>, pg_conn: Option<&str>) -> Result<()> {
let config = SshServerConfig {
port: port.unwrap_or(2024),
bind_address: "127.0.0.1".to_string(),
security_config: SshSecurityConfig::enterprise_default(),
pg_conn: pg_conn.map(|s| s.to_string()),
upload_hook_config: crate::config::UploadHookSection::default(),
};
let server = SshServer::new(config);
server.run()
}