SMB performance optimization: pread/pwrite, tokio::sync::Mutex, direct response, fast-path

- VfsFile trait: add read_at()/write_at() with seek+read default impl
- LocalFs: override with real pread/pwrite (FileExt::read_at/write_at) — 1 syscall vs 2
- smb_server_backend: use read_at/write_at + tokio::sync::Mutex (non-blocking async)
- read handler: build response directly, avoid Bytes→Vec<u8> copy + intermediate struct
- oplock break: fast-path skip when ≤1 open entry (single-user scenario)
This commit is contained in:
Warren
2026-06-23 09:58:19 +08:00
parent e7863a3034
commit d4f60929fa
6 changed files with 58 additions and 31 deletions

View File

@@ -3,7 +3,7 @@ use super::util;
use super::{VfsAce, VfsAceFlag, VfsAceMask, VfsAceType, VfsAcl, VfsBackend, VfsDirEntry, VfsError, VfsFile, VfsPreviousVersion, VfsQuota, VfsQuotaUsage, VfsSnapshotInfo, VfsStat}; use super::{VfsAce, VfsAceFlag, VfsAceMask, VfsAceType, VfsAcl, VfsBackend, VfsDirEntry, VfsError, VfsFile, VfsPreviousVersion, VfsQuota, VfsQuotaUsage, VfsSnapshotInfo, VfsStat};
use std::fs::{self, File, OpenOptions}; use std::fs::{self, File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write}; use std::io::{Read, Seek, SeekFrom, Write};
use std::os::unix::fs::{MetadataExt, PermissionsExt}; use std::os::unix::fs::{FileExt, MetadataExt, PermissionsExt};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::time::SystemTime; use std::time::SystemTime;
@@ -42,6 +42,14 @@ impl VfsFile for LocalFile {
self.file.seek(pos).map_err(|e| VfsError::Io(e.to_string())) self.file.seek(pos).map_err(|e| VfsError::Io(e.to_string()))
} }
fn read_at(&mut self, buf: &mut [u8], offset: u64) -> Result<usize, VfsError> {
self.file.read_at(buf, offset).map_err(|e| VfsError::Io(e.to_string()))
}
fn write_at(&mut self, buf: &[u8], offset: u64) -> Result<usize, VfsError> {
self.file.write_at(buf, offset).map_err(|e| VfsError::Io(e.to_string()))
}
fn flush(&mut self) -> Result<(), VfsError> { fn flush(&mut self) -> Result<(), VfsError> {
self.file.flush().map_err(|e| VfsError::Io(e.to_string())) self.file.flush().map_err(|e| VfsError::Io(e.to_string()))
} }

View File

@@ -103,6 +103,20 @@ pub trait VfsFile: Send {
fn stat(&mut self) -> Result<VfsStat, VfsError>; fn stat(&mut self) -> Result<VfsStat, VfsError>;
fn set_len(&mut self, size: u64) -> Result<(), VfsError>; fn set_len(&mut self, size: u64) -> Result<(), VfsError>;
/// Read at `offset` without changing the seek position (like pread).
/// Default implementation does seek + read.
fn read_at(&mut self, buf: &mut [u8], offset: u64) -> Result<usize, VfsError> {
self.seek(std::io::SeekFrom::Start(offset))?;
self.read(buf)
}
/// Write at `offset` without changing the seek position (like pwrite).
/// Default implementation does seek + write.
fn write_at(&mut self, buf: &[u8], offset: u64) -> Result<usize, VfsError> {
self.seek(std::io::SeekFrom::Start(offset))?;
self.write(buf)
}
/// Write all bytes (convenience, default loops write() until done) /// Write all bytes (convenience, default loops write() until done)
fn write_all(&mut self, mut buf: &[u8]) -> Result<(), VfsError> { fn write_all(&mut self, mut buf: &[u8]) -> Result<(), VfsError> {
while !buf.is_empty() { while !buf.is_empty() {

View File

@@ -1,7 +1,7 @@
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex;
use std::time::SystemTime; use std::time::SystemTime;
use tokio::sync::Mutex;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
@@ -211,11 +211,9 @@ impl Handle for VfsHandle {
async fn read(&self, offset: u64, len: u32) -> Result<Bytes, SmbError> { async fn read(&self, offset: u64, len: u32) -> Result<Bytes, SmbError> {
match self { match self {
Self::File { file, .. } => { Self::File { file, .. } => {
let mut file = file.lock().unwrap(); let mut file = file.lock().await;
file.seek(std::io::SeekFrom::Start(offset))
.map_err(vfs_error_to_io)?;
let mut buf = vec![0u8; len as usize]; let mut buf = vec![0u8; len as usize];
let n = file.read(&mut buf).map_err(map_error)?; let n = file.read_at(&mut buf, offset).map_err(map_error)?;
buf.truncate(n); buf.truncate(n);
Ok(Bytes::from(buf)) Ok(Bytes::from(buf))
} }
@@ -226,10 +224,8 @@ impl Handle for VfsHandle {
async fn write(&self, offset: u64, data: &[u8]) -> Result<u32, SmbError> { async fn write(&self, offset: u64, data: &[u8]) -> Result<u32, SmbError> {
match self { match self {
Self::File { file, .. } => { Self::File { file, .. } => {
let mut file = file.lock().unwrap(); let mut file = file.lock().await;
file.seek(std::io::SeekFrom::Start(offset)) let n = file.write_at(data, offset).map_err(map_error)?;
.map_err(vfs_error_to_io)?;
let n = file.write(data).map_err(map_error)?;
Ok(n as u32) Ok(n as u32)
} }
Self::Directory { .. } => Err(SmbError::NotSupported), Self::Directory { .. } => Err(SmbError::NotSupported),
@@ -239,7 +235,7 @@ impl Handle for VfsHandle {
async fn flush(&self) -> Result<(), SmbError> { async fn flush(&self) -> Result<(), SmbError> {
match self { match self {
Self::File { file, .. } => { Self::File { file, .. } => {
let mut file = file.lock().unwrap(); let mut file = file.lock().await;
file.flush().map_err(map_error) file.flush().map_err(map_error)
} }
Self::Directory { .. } => Ok(()), Self::Directory { .. } => Ok(()),
@@ -249,7 +245,7 @@ impl Handle for VfsHandle {
async fn stat(&self) -> Result<FileInfo, SmbError> { async fn stat(&self) -> Result<FileInfo, SmbError> {
match self { match self {
Self::File { file, path, .. } => { Self::File { file, path, .. } => {
let mut f = file.lock().unwrap(); let mut f = file.lock().await;
let vfs_stat = f.stat().map_err(map_error)?; let vfs_stat = f.stat().map_err(map_error)?;
Ok(vfs_stat_to_file_info(&vfs_stat, "", path)) Ok(vfs_stat_to_file_info(&vfs_stat, "", path))
} }
@@ -278,7 +274,7 @@ impl Handle for VfsHandle {
async fn truncate(&self, len: u64) -> Result<(), SmbError> { async fn truncate(&self, len: u64) -> Result<(), SmbError> {
match self { match self {
Self::File { file, .. } => { Self::File { file, .. } => {
let mut file = file.lock().unwrap(); let mut file = file.lock().await;
file.set_len(len).map_err(map_error) file.set_len(len).map_err(map_error)
} }
Self::Directory { .. } => Err(SmbError::NotSupported), Self::Directory { .. } => Err(SmbError::NotSupported),

View File

@@ -207,8 +207,8 @@ pub trait Handle: Send + Sync {
/// Write `data` at `offset`. Returns bytes written. /// Write `data` at `offset`. Returns bytes written.
async fn write(&self, offset: u64, data: &[u8]) -> SmbResult<u32>; async fn write(&self, offset: u64, data: &[u8]) -> SmbResult<u32>;
/// Write owned `data` at `offset`. Backends that need ownership across a /// Write owned `data` at `offset`. Backends needing ownership across a
/// blocking boundary can override this to avoid an extra copy. /// blocking boundary should override to avoid an extra copy.
async fn write_owned(&self, offset: u64, data: Vec<u8>) -> SmbResult<u32> { async fn write_owned(&self, offset: u64, data: Vec<u8>) -> SmbResult<u32> {
self.write(offset, &data).await self.write(offset, &data).await
} }

View File

@@ -91,16 +91,15 @@ pub async fn handle(
if bytes.is_empty() && req.length > 0 { if bytes.is_empty() && req.length > 0 {
return HandlerResponse::err(ntstatus::STATUS_END_OF_FILE); return HandlerResponse::err(ntstatus::STATUS_END_OF_FILE);
} }
let resp = ReadResponse { // Build response directly to avoid Bytes→Vec<u8> copy and intermediate struct
structure_size: 17, let data_len = bytes.len() as u32;
data_offset: ReadResponse::STANDARD_DATA_OFFSET, let mut buf = Vec::with_capacity(16 + bytes.len());
reserved: 0, buf.extend_from_slice(&17u16.to_le_bytes()); // structure_size
data_length: bytes.len() as u32, buf.push(ReadResponse::STANDARD_DATA_OFFSET); // data_offset
data_remaining: 0, buf.push(0); // reserved
flags: 0, buf.extend_from_slice(&data_len.to_le_bytes()); // data_length
data: bytes.to_vec(), buf.extend_from_slice(&0u32.to_le_bytes()); // data_remaining
}; buf.extend_from_slice(&0u32.to_le_bytes()); // flags
let mut buf = Vec::new(); buf.extend_from_slice(&bytes); // data
resp.write_to(&mut buf).expect("encode");
HandlerResponse::ok(buf) HandlerResponse::ok(buf)
} }

View File

@@ -112,17 +112,23 @@ impl OplockManager {
new_share_access: u32, new_share_access: u32,
new_granted_access: Access, new_granted_access: Access,
) -> Vec<OplockBreakNotification> { ) -> Vec<OplockBreakNotification> {
// Fast-path: no entries or single entry can't conflict with itself
let entry_count = {
let file_opens = self.file_opens.read().await;
file_opens.get(path).map_or(0, |e| e.len())
};
if entry_count <= 1 {
return Vec::new();
}
let mut notifications = Vec::new(); let mut notifications = Vec::new();
let mut file_opens = self.file_opens.write().await; let mut file_opens = self.file_opens.write().await;
if let Some(entries) = file_opens.get_mut(path) { if let Some(entries) = file_opens.get_mut(path) {
for entry in entries.iter_mut() { for entry in entries.iter_mut() {
// Check if new open conflicts with existing oplock
if !share_access_compatible(entry.share_access, new_share_access) { if !share_access_compatible(entry.share_access, new_share_access) {
// Need to break the oplock let new_level = OplockLevel::Ii as u8;
let new_level = OplockLevel::Ii as u8; // Downgrade to Level II
// Build notification (MS-SMB2 §2.2.23.1)
notifications.push(OplockBreakNotification { notifications.push(OplockBreakNotification {
structure_size: 24, structure_size: 24,
oplock_level: new_level, oplock_level: new_level,
@@ -131,7 +137,6 @@ impl OplockManager {
file_id: entry.file_id, file_id: entry.file_id,
}); });
// Update entry's oplock level
entry.oplock_level = new_level; entry.oplock_level = new_level;
} }
} }
@@ -266,6 +271,11 @@ impl LeaseManager {
/// Break lease when conflicting access occurs (MS-SMB2 §3.3.5.10). /// Break lease when conflicting access occurs (MS-SMB2 §3.3.5.10).
pub async fn break_lease(&self, requested_state: u32) -> Vec<LeaseBreakNotification> { pub async fn break_lease(&self, requested_state: u32) -> Vec<LeaseBreakNotification> {
// Fast-path: no leases to break
if self.leases.read().await.is_empty() {
return Vec::new();
}
let mut leases = self.leases.write().await; let mut leases = self.leases.write().await;
let mut notifications = Vec::new(); let mut notifications = Vec::new();