From 51ca0c46332c4f88f6cdc1ff91c871d328908561 Mon Sep 17 00:00:00 2001 From: Warren Date: Sat, 20 Jun 2026 20:26:35 +0800 Subject: [PATCH] SMB VFS: Add set_len, set_stat, streaming write, auto_reconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - set_len() via SMB SET_INFO compound (CREATE → SET_INFO → CLOSE) with FileEndOfFileInformation (class 14) - set_stat() via SMB SET_INFO compound with FileBasicInformation (class 4) for timestamp updates (atime, mtime) - Streaming write using Tree::create_file_writer + FileWriter::write_chunk + finish for pipelined uploads - Add file_writer: Option to SmbVfsFile for streaming state - Enable auto_reconnect by default (new_with_options param) - Add systemtime_to_filetime helper for timestamp conversion All 229 tests pass. --- markbase-core/src/vfs/smb_fs.rs | 319 +++++++++++++++++++++++++++++++- 1 file changed, 311 insertions(+), 8 deletions(-) diff --git a/markbase-core/src/vfs/smb_fs.rs b/markbase-core/src/vfs/smb_fs.rs index 344ccc6..fbcfd06 100644 --- a/markbase-core/src/vfs/smb_fs.rs +++ b/markbase-core/src/vfs/smb_fs.rs @@ -17,6 +17,13 @@ fn filetime_to_systemtime(raw: u64) -> SystemTime { } } +fn systemtime_to_filetime(st: SystemTime) -> u64 { + let duration = st.duration_since(UNIX_EPOCH).unwrap_or_default(); + let secs = duration.as_secs() + FILETIME_TO_UNIX_SECS; + let nanos = duration.subsec_nanos() as u64; + (secs * 10_000_000) + (nanos / 100) +} + fn map_smb_error(e: smb2::Error) -> VfsError { match e.kind() { smb2::ErrorKind::NotFound => VfsError::NotFound(e.to_string()), @@ -40,6 +47,16 @@ pub struct SmbVfs { impl SmbVfs { pub fn new(addr: &str, share: &str, username: &str, password: &str) -> Result { + Self::new_with_options(addr, share, username, password, true) + } + + pub fn new_with_options( + addr: &str, + share: &str, + username: &str, + password: &str, + auto_reconnect: bool, + ) -> Result { let runtime = Arc::new( tokio::runtime::Builder::new_current_thread() .enable_all() @@ -53,7 +70,7 @@ impl SmbVfs { username: username.to_string(), password: password.to_string(), domain: String::new(), - auto_reconnect: false, + auto_reconnect, compression: true, dfs_enabled: false, dfs_target_overrides: std::collections::HashMap::new(), @@ -149,6 +166,7 @@ impl VfsBackend for SmbVfs { write_buf: Vec::new(), data: Vec::new(), size: 0, + file_writer: None, })) } else { let data = self @@ -166,6 +184,7 @@ impl VfsBackend for SmbVfs { write_buf: Vec::new(), data, size, + file_writer: None, })) } } @@ -268,8 +287,136 @@ impl VfsBackend for SmbVfs { .map_err(map_smb_error) } - fn set_stat(&self, _path: &Path, _stat: &VfsStat) -> Result<(), VfsError> { - Err(VfsError::Unsupported("SMB set_stat".to_string())) + fn set_stat(&self, path: &Path, stat: &VfsStat) -> Result<(), VfsError> { + let smb_path = Self::path_to_str(path); + let tree_id = self.tree.lock().unwrap().tree_id; + + let mut client = self + .client + .lock() + .map_err(|e| VfsError::Io(e.to_string()))?; + let conn = client.connection_mut(); + + use smb2::client::connection::CompoundOp; + use smb2::msg::close::CloseRequest; + use smb2::msg::create::{ + CreateDisposition, CreateRequest, CreateResponse, ImpersonationLevel, ShareAccess, + }; + use smb2::msg::query_info::InfoType; + use smb2::msg::set_info::SetInfoRequest; + use smb2::pack::{ReadCursor, Unpack}; + use smb2::types::flags::FileAccessMask; + use smb2::types::status::NtStatus; + use smb2::types::{Command, CreditCharge, FileId, OplockLevel}; + + const FILE_BASIC_INFORMATION: u8 = 4; + + let create_req = CreateRequest { + requested_oplock_level: OplockLevel::None, + impersonation_level: ImpersonationLevel::Impersonation, + desired_access: FileAccessMask::new(FileAccessMask::FILE_WRITE_ATTRIBUTES), + file_attributes: 0, + share_access: ShareAccess( + ShareAccess::FILE_SHARE_READ + | ShareAccess::FILE_SHARE_WRITE + | ShareAccess::FILE_SHARE_DELETE, + ), + create_disposition: CreateDisposition::FileOpen, + create_options: 0, + name: smb_path, + create_contexts: vec![], + }; + + let creation_time = 0u64; + let last_access_time = systemtime_to_filetime(stat.atime); + let last_write_time = systemtime_to_filetime(stat.mtime); + let change_time = 0u64; + let file_attributes = 0u32; + let reserved = 0u32; + + let mut setinfo_buf = Vec::with_capacity(40); + setinfo_buf.extend_from_slice(&creation_time.to_le_bytes()); + setinfo_buf.extend_from_slice(&last_access_time.to_le_bytes()); + setinfo_buf.extend_from_slice(&last_write_time.to_le_bytes()); + setinfo_buf.extend_from_slice(&change_time.to_le_bytes()); + setinfo_buf.extend_from_slice(&file_attributes.to_le_bytes()); + setinfo_buf.extend_from_slice(&reserved.to_le_bytes()); + + let setinfo_req = SetInfoRequest { + info_type: InfoType::File, + file_info_class: FILE_BASIC_INFORMATION, + additional_information: 0, + file_id: FileId::SENTINEL, + buffer: setinfo_buf, + }; + + let close_req = CloseRequest { + flags: 0, + file_id: FileId::SENTINEL, + }; + + let ops = [ + CompoundOp { + command: Command::Create, + body: &create_req, + tree_id: Some(tree_id), + credit_charge: CreditCharge(1), + }, + CompoundOp { + command: Command::SetInfo, + body: &setinfo_req, + tree_id: Some(tree_id), + credit_charge: CreditCharge(1), + }, + CompoundOp { + command: Command::Close, + body: &close_req, + tree_id: Some(tree_id), + credit_charge: CreditCharge(1), + }, + ]; + + let responses = self.runtime.block_on(async { + let frames = conn + .execute_compound(&ops) + .await + .map_err(|e| VfsError::Io(format!("SMB set_stat compound failed: {}", e)))?; + let frames: Vec<_> = frames + .into_iter() + .collect::, _>>() + .map_err(|e| VfsError::Io(format!("SMB set_stat waiter error: {}", e)))?; + Ok::<_, VfsError>(frames) + })?; + + let create_header = &responses[0].header; + let create_body = &responses[0].body; + let setinfo_header = &responses[1].header; + + if create_header.status != NtStatus::SUCCESS { + return Err(VfsError::NotFound(format!( + "SMB set_stat: file not found ({})", + create_header.status + ))); + } + + if setinfo_header.status != NtStatus::SUCCESS { + let mut cursor = ReadCursor::new(create_body); + if let Ok(create_resp) = CreateResponse::unpack(&mut cursor) { + let standalone_close = CloseRequest { + flags: 0, + file_id: create_resp.file_id, + }; + let _: Result<_, _> = self.runtime.block_on( + conn.execute(Command::Close, &standalone_close, Some(tree_id)), + ); + } + return Err(VfsError::Io(format!( + "SMB set_stat: SET_INFO failed ({})", + setinfo_header.status + ))); + } + + Ok(()) } fn read_link(&self, _path: &Path) -> Result { @@ -329,6 +476,7 @@ struct SmbVfsFile { write_buf: Vec, data: Vec, size: u64, + file_writer: Option, } impl SmbVfsFile { @@ -364,7 +512,29 @@ impl VfsFile for SmbVfsFile { } fn write(&mut self, buf: &[u8]) -> Result { - self.write_buf.extend_from_slice(buf); + if self.file_writer.is_none() { + let tree_arc = Arc::new(self.tree.clone()); + let conn = { + let mut client = self + .client + .lock() + .map_err(|e| VfsError::Io(e.to_string()))?; + client.connection_mut().clone() + }; + + let writer = self + .runtime + .block_on(tree_arc.create_file_writer(conn, &self.path)) + .map_err(map_smb_error)?; + self.file_writer = Some(writer); + } + + if let Some(writer) = &mut self.file_writer { + self.runtime + .block_on(writer.write_chunk(buf)) + .map_err(map_smb_error)?; + } + self.position += buf.len() as u64; Ok(buf.len()) } @@ -398,7 +568,13 @@ impl VfsFile for SmbVfsFile { fn flush(&mut self) -> Result<(), VfsError> { if let FileMode::Write = self.mode { - if !self.write_buf.is_empty() { + if let Some(writer) = self.file_writer.take() { + let total = self + .runtime + .block_on(writer.finish()) + .map_err(map_smb_error)?; + self.size = total; + } else if !self.write_buf.is_empty() { let data = std::mem::take(&mut self.write_buf); let mut client = self .client @@ -434,15 +610,142 @@ impl VfsFile for SmbVfsFile { }) } - fn set_len(&mut self, _size: u64) -> Result<(), VfsError> { - Err(VfsError::Unsupported("SMB set_len".to_string())) + fn set_len(&mut self, size: u64) -> Result<(), VfsError> { + if !self.write_buf.is_empty() { + self.flush()?; + } + + let path = self.path.clone(); + let tree_id = self.tree.tree_id; + + let mut client = self + .client + .lock() + .map_err(|e| VfsError::Io(e.to_string()))?; + let conn = client.connection_mut(); + + use smb2::client::connection::CompoundOp; + use smb2::msg::close::CloseRequest; + use smb2::msg::create::{ + CreateDisposition, CreateRequest, CreateResponse, ImpersonationLevel, ShareAccess, + }; + use smb2::msg::query_info::InfoType; + use smb2::msg::set_info::SetInfoRequest; + use smb2::pack::{ReadCursor, Unpack}; + use smb2::types::flags::FileAccessMask; + use smb2::types::status::NtStatus; + use smb2::types::{Command, CreditCharge, FileId, OplockLevel}; + + const FILE_END_OF_FILE_INFORMATION: u8 = 14; + + let create_req = CreateRequest { + requested_oplock_level: OplockLevel::None, + impersonation_level: ImpersonationLevel::Impersonation, + desired_access: FileAccessMask::new( + FileAccessMask::FILE_WRITE_DATA | FileAccessMask::SYNCHRONIZE, + ), + file_attributes: 0, + share_access: ShareAccess( + ShareAccess::FILE_SHARE_READ + | ShareAccess::FILE_SHARE_WRITE + | ShareAccess::FILE_SHARE_DELETE, + ), + create_disposition: CreateDisposition::FileOpen, + create_options: 0, + name: path, + create_contexts: vec![], + }; + + let setinfo_buf = size.to_le_bytes().to_vec(); + let setinfo_req = SetInfoRequest { + info_type: InfoType::File, + file_info_class: FILE_END_OF_FILE_INFORMATION, + additional_information: 0, + file_id: FileId::SENTINEL, + buffer: setinfo_buf, + }; + + let close_req = CloseRequest { + flags: 0, + file_id: FileId::SENTINEL, + }; + + let ops = [ + CompoundOp { + command: Command::Create, + body: &create_req, + tree_id: Some(tree_id), + credit_charge: CreditCharge(1), + }, + CompoundOp { + command: Command::SetInfo, + body: &setinfo_req, + tree_id: Some(tree_id), + credit_charge: CreditCharge(1), + }, + CompoundOp { + command: Command::Close, + body: &close_req, + tree_id: Some(tree_id), + credit_charge: CreditCharge(1), + }, + ]; + + let responses = self.runtime.block_on(async { + let frames = conn + .execute_compound(&ops) + .await + .map_err(|e| VfsError::Io(format!("SMB set_len compound failed: {}", e)))?; + let frames: Vec<_> = frames + .into_iter() + .collect::, _>>() + .map_err(|e| VfsError::Io(format!("SMB set_len waiter error: {}", e)))?; + Ok::<_, VfsError>(frames) + })?; + + let create_header = &responses[0].header; + let create_body = &responses[0].body; + let setinfo_header = &responses[1].header; + + if create_header.status != NtStatus::SUCCESS { + return Err(VfsError::NotFound(format!( + "SMB set_len: file not found ({})", + create_header.status + ))); + } + + if setinfo_header.status != NtStatus::SUCCESS { + let mut cursor = ReadCursor::new(create_body); + if let Ok(create_resp) = CreateResponse::unpack(&mut cursor) { + let standalone_close = CloseRequest { + flags: 0, + file_id: create_resp.file_id, + }; + let _: Result<_, _> = self.runtime.block_on( + conn.execute(Command::Close, &standalone_close, Some(tree_id)), + ); + } + return Err(VfsError::Io(format!( + "SMB set_len: SET_INFO failed ({})", + setinfo_header.status + ))); + } + + self.size = size; + if (size as usize) < self.data.len() { + self.data.truncate(size as usize); + } + + Ok(()) } } impl Drop for SmbVfsFile { fn drop(&mut self) { if let FileMode::Write = self.mode { - if !self.write_buf.is_empty() { + if let Some(writer) = self.file_writer.take() { + let _ = self.runtime.block_on(writer.finish()); + } else if !self.write_buf.is_empty() { let data = std::mem::take(&mut self.write_buf); if let Ok(mut client) = self.client.lock() { let _ =