Implement SMB streaming read using chunked READ requests
- Add file_id and read_chunk_size fields to SmbVfsFile - Use Tree::open_file() to get file_id for reads - Issue READ requests on each read() call (64KB chunks) - Close file handle in Drop Benefits: - No memory overhead for large files - Read-ahead caching possible - Compatible with SMB2 protocol All 229 tests pass.
This commit is contained in:
@@ -167,13 +167,19 @@ impl VfsBackend for SmbVfs {
|
||||
data: Vec::new(),
|
||||
size: 0,
|
||||
file_writer: None,
|
||||
file_id: None,
|
||||
read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
|
||||
}))
|
||||
} else {
|
||||
let data = self
|
||||
.runtime
|
||||
.block_on(client.read_file(&mut tree, &smb_path))
|
||||
.map_err(map_smb_error)?;
|
||||
let size = data.len() as u64;
|
||||
// Streaming read: open file and store file_id
|
||||
let (file_id, file_size) = {
|
||||
let mut client = self.client.lock().map_err(|e| VfsError::Io(e.to_string()))?;
|
||||
let mut tree = self.tree.lock().unwrap();
|
||||
self.runtime
|
||||
.block_on(tree.open_file(client.connection_mut(), &smb_path))
|
||||
.map_err(map_smb_error)?
|
||||
};
|
||||
|
||||
Ok(Box::new(SmbVfsFile {
|
||||
runtime: self.runtime.clone(),
|
||||
client: self.client.clone(),
|
||||
@@ -182,9 +188,11 @@ impl VfsBackend for SmbVfs {
|
||||
mode: FileMode::Read,
|
||||
position: 0,
|
||||
write_buf: Vec::new(),
|
||||
data,
|
||||
size,
|
||||
data: Vec::new(),
|
||||
size: file_size,
|
||||
file_writer: None,
|
||||
file_id: Some(file_id),
|
||||
read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -477,8 +485,12 @@ struct SmbVfsFile {
|
||||
data: Vec<u8>,
|
||||
size: u64,
|
||||
file_writer: Option<smb2::FileWriter>,
|
||||
file_id: Option<smb2::types::FileId>,
|
||||
read_chunk_size: u32,
|
||||
}
|
||||
|
||||
const DEFAULT_READ_CHUNK_SIZE: u32 = 64 * 1024; // 64KB chunks
|
||||
|
||||
impl SmbVfsFile {
|
||||
fn ensure_data_loaded(&mut self) -> Result<(), VfsError> {
|
||||
if self.data.is_empty() && self.size > 0 {
|
||||
@@ -499,16 +511,67 @@ impl SmbVfsFile {
|
||||
|
||||
impl VfsFile for SmbVfsFile {
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize, VfsError> {
|
||||
self.ensure_data_loaded()?;
|
||||
if self.position >= self.size {
|
||||
return Ok(0);
|
||||
}
|
||||
let start = self.position as usize;
|
||||
let available = self.size as usize - start;
|
||||
let to_copy = std::cmp::min(buf.len(), available);
|
||||
buf[..to_copy].copy_from_slice(&self.data[start..start + to_copy]);
|
||||
self.position += to_copy as u64;
|
||||
Ok(to_copy)
|
||||
|
||||
// Streaming read using file_id
|
||||
if let Some(file_id) = &self.file_id {
|
||||
let offset = self.position;
|
||||
let to_read = std::cmp::min(buf.len() as u32, self.read_chunk_size);
|
||||
let remaining = self.size - self.position;
|
||||
let actual_read = std::cmp::min(to_read as u64, remaining) as u32;
|
||||
|
||||
if actual_read == 0 {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
use smb2::msg::read::ReadRequest;
|
||||
use smb2::types::{Command, FileId};
|
||||
|
||||
let req = ReadRequest {
|
||||
padding: 0,
|
||||
flags: 0,
|
||||
length: actual_read,
|
||||
offset,
|
||||
file_id: FileId {
|
||||
persistent: file_id.persistent,
|
||||
volatile: file_id.volatile,
|
||||
},
|
||||
minimum_count: 0,
|
||||
channel: 0,
|
||||
remaining_bytes: 0,
|
||||
read_channel_info: Vec::new(),
|
||||
};
|
||||
|
||||
let mut client = self.client.lock().map_err(|e| VfsError::Io(e.to_string()))?;
|
||||
let tree_id = self.tree.tree_id;
|
||||
|
||||
let response = self.runtime
|
||||
.block_on(client.connection_mut().execute(Command::Read, &req, Some(tree_id)))
|
||||
.map_err(map_smb_error)?;
|
||||
|
||||
use smb2::pack::{ReadCursor, Unpack};
|
||||
use smb2::msg::read::ReadResponse;
|
||||
let mut cursor = ReadCursor::new(&response.body);
|
||||
let read_resp = ReadResponse::unpack(&mut cursor)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to parse ReadResponse: {}", e)))?;
|
||||
|
||||
let bytes_read = read_resp.data.len();
|
||||
buf[..bytes_read].copy_from_slice(&read_resp.data);
|
||||
self.position += bytes_read as u64;
|
||||
|
||||
Ok(bytes_read)
|
||||
} else {
|
||||
// Buffered read (fallback)
|
||||
self.ensure_data_loaded()?;
|
||||
let start = self.position as usize;
|
||||
let available = self.size as usize - start;
|
||||
let to_copy = std::cmp::min(buf.len(), available);
|
||||
buf[..to_copy].copy_from_slice(&self.data[start..start + to_copy]);
|
||||
self.position += to_copy as u64;
|
||||
Ok(to_copy)
|
||||
}
|
||||
}
|
||||
|
||||
fn write(&mut self, buf: &[u8]) -> Result<usize, VfsError> {
|
||||
@@ -742,6 +805,26 @@ impl VfsFile for SmbVfsFile {
|
||||
|
||||
impl Drop for SmbVfsFile {
|
||||
fn drop(&mut self) {
|
||||
// Close file handle for streaming read
|
||||
if let Some(file_id) = self.file_id.take() {
|
||||
if let Ok(mut client) = self.client.lock() {
|
||||
use smb2::msg::close::CloseRequest;
|
||||
use smb2::types::{Command, FileId};
|
||||
let req = CloseRequest {
|
||||
flags: 0,
|
||||
file_id: FileId {
|
||||
persistent: file_id.persistent,
|
||||
volatile: file_id.volatile,
|
||||
},
|
||||
};
|
||||
let tree_id = self.tree.tree_id;
|
||||
let _: Result<_, _> = self.runtime.block_on(
|
||||
client.connection_mut().execute(Command::Close, &req, Some(tree_id)),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Finish streaming write
|
||||
if let FileMode::Write = self.mode {
|
||||
if let Some(writer) = self.file_writer.take() {
|
||||
let _ = self.runtime.block_on(writer.finish());
|
||||
|
||||
Reference in New Issue
Block a user