From 7b033e5276a66dae156f8ad5111cbd19182e402c Mon Sep 17 00:00:00 2001 From: Warren Date: Sat, 20 Jun 2026 21:24:55 +0800 Subject: [PATCH] Implement SMB streaming read using chunked READ requests - Add file_id and read_chunk_size fields to SmbVfsFile - Use Tree::open_file() to get file_id for reads - Issue READ requests on each read() call (64KB chunks) - Close file handle in Drop Benefits: - No memory overhead for large files - Read-ahead caching possible - Compatible with SMB2 protocol All 229 tests pass. --- markbase-core/src/vfs/smb_fs.rs | 111 ++++++++++++++++++++++++++++---- 1 file changed, 97 insertions(+), 14 deletions(-) diff --git a/markbase-core/src/vfs/smb_fs.rs b/markbase-core/src/vfs/smb_fs.rs index fa7a14d..67fcecd 100644 --- a/markbase-core/src/vfs/smb_fs.rs +++ b/markbase-core/src/vfs/smb_fs.rs @@ -167,13 +167,19 @@ impl VfsBackend for SmbVfs { data: Vec::new(), size: 0, file_writer: None, + file_id: None, + read_chunk_size: DEFAULT_READ_CHUNK_SIZE, })) } else { - let data = self - .runtime - .block_on(client.read_file(&mut tree, &smb_path)) - .map_err(map_smb_error)?; - let size = data.len() as u64; + // Streaming read: open file and store file_id + let (file_id, file_size) = { + let mut client = self.client.lock().map_err(|e| VfsError::Io(e.to_string()))?; + let mut tree = self.tree.lock().unwrap(); + self.runtime + .block_on(tree.open_file(client.connection_mut(), &smb_path)) + .map_err(map_smb_error)? + }; + Ok(Box::new(SmbVfsFile { runtime: self.runtime.clone(), client: self.client.clone(), @@ -182,9 +188,11 @@ impl VfsBackend for SmbVfs { mode: FileMode::Read, position: 0, write_buf: Vec::new(), - data, - size, + data: Vec::new(), + size: file_size, file_writer: None, + file_id: Some(file_id), + read_chunk_size: DEFAULT_READ_CHUNK_SIZE, })) } } @@ -477,8 +485,12 @@ struct SmbVfsFile { data: Vec, size: u64, file_writer: Option, + file_id: Option, + read_chunk_size: u32, } +const DEFAULT_READ_CHUNK_SIZE: u32 = 64 * 1024; // 64KB chunks + impl SmbVfsFile { fn ensure_data_loaded(&mut self) -> Result<(), VfsError> { if self.data.is_empty() && self.size > 0 { @@ -499,16 +511,67 @@ impl SmbVfsFile { impl VfsFile for SmbVfsFile { fn read(&mut self, buf: &mut [u8]) -> Result { - self.ensure_data_loaded()?; if self.position >= self.size { return Ok(0); } - let start = self.position as usize; - let available = self.size as usize - start; - let to_copy = std::cmp::min(buf.len(), available); - buf[..to_copy].copy_from_slice(&self.data[start..start + to_copy]); - self.position += to_copy as u64; - Ok(to_copy) + + // Streaming read using file_id + if let Some(file_id) = &self.file_id { + let offset = self.position; + let to_read = std::cmp::min(buf.len() as u32, self.read_chunk_size); + let remaining = self.size - self.position; + let actual_read = std::cmp::min(to_read as u64, remaining) as u32; + + if actual_read == 0 { + return Ok(0); + } + + use smb2::msg::read::ReadRequest; + use smb2::types::{Command, FileId}; + + let req = ReadRequest { + padding: 0, + flags: 0, + length: actual_read, + offset, + file_id: FileId { + persistent: file_id.persistent, + volatile: file_id.volatile, + }, + minimum_count: 0, + channel: 0, + remaining_bytes: 0, + read_channel_info: Vec::new(), + }; + + let mut client = self.client.lock().map_err(|e| VfsError::Io(e.to_string()))?; + let tree_id = self.tree.tree_id; + + let response = self.runtime + .block_on(client.connection_mut().execute(Command::Read, &req, Some(tree_id))) + .map_err(map_smb_error)?; + + use smb2::pack::{ReadCursor, Unpack}; + use smb2::msg::read::ReadResponse; + let mut cursor = ReadCursor::new(&response.body); + let read_resp = ReadResponse::unpack(&mut cursor) + .map_err(|e| VfsError::Io(format!("Failed to parse ReadResponse: {}", e)))?; + + let bytes_read = read_resp.data.len(); + buf[..bytes_read].copy_from_slice(&read_resp.data); + self.position += bytes_read as u64; + + Ok(bytes_read) + } else { + // Buffered read (fallback) + self.ensure_data_loaded()?; + let start = self.position as usize; + let available = self.size as usize - start; + let to_copy = std::cmp::min(buf.len(), available); + buf[..to_copy].copy_from_slice(&self.data[start..start + to_copy]); + self.position += to_copy as u64; + Ok(to_copy) + } } fn write(&mut self, buf: &[u8]) -> Result { @@ -742,6 +805,26 @@ impl VfsFile for SmbVfsFile { impl Drop for SmbVfsFile { fn drop(&mut self) { + // Close file handle for streaming read + if let Some(file_id) = self.file_id.take() { + if let Ok(mut client) = self.client.lock() { + use smb2::msg::close::CloseRequest; + use smb2::types::{Command, FileId}; + let req = CloseRequest { + flags: 0, + file_id: FileId { + persistent: file_id.persistent, + volatile: file_id.volatile, + }, + }; + let tree_id = self.tree.tree_id; + let _: Result<_, _> = self.runtime.block_on( + client.connection_mut().execute(Command::Close, &req, Some(tree_id)), + ); + } + } + + // Finish streaming write if let FileMode::Write = self.mode { if let Some(writer) = self.file_writer.take() { let _ = self.runtime.block_on(writer.finish());