Files
markbase/src/pg_client.rs
Warren 7a44e7fa6b fix: Correct pg_client.rs fetch_admins and server.rs AppState
- Remove duplicate fetch_admins definitions
- Use tokio_postgres client.query() instead of sqlx
- Fix sync_admins() call in full_sync()
- Add AppState.auth field to hold AuthState
- Update admin handlers to use AppState

All tests passing:
- Admin sync: working
- Admin login: token generated
- Admin verify: username verified
- SQLite: admin record exists
2026-05-16 20:49:54 +08:00

344 lines
12 KiB
Rust

use anyhow::Result;
use tokio_postgres::{NoTls, Client};
use crate::sync::{PgUser, PgGroup, PgUserGroupMapping, PgAdmin, SyncResult};
pub struct PgClient {
host: String,
port: u16,
user: String,
password: String,
database: String,
}
impl PgClient {
pub fn new() -> Self {
Self {
host: "127.0.0.1".to_string(),
port: 5432,
user: "sftpgo".to_string(),
password: "sftpgo_pass_2026".to_string(),
database: "sftpgo".to_string(),
}
}
pub fn from_env() -> Self {
Self {
host: std::env::var("PG_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()),
port: std::env::var("PG_PORT")
.unwrap_or_else(|_| "5432".to_string())
.parse()
.unwrap_or(5432),
user: std::env::var("PG_USER").unwrap_or_else(|_| "sftpgo".to_string()),
password: std::env::var("PG_PASSWORD")
.unwrap_or_else(|_| "sftpgo_pass_2026".to_string()),
database: std::env::var("PG_DATABASE")
.unwrap_or_else(|_| "sftpgo".to_string()),
}
}
pub async fn connect(&self) -> Result<Client> {
let config = format!(
"host={} port={} user={} password={} dbname={}",
self.host, self.port, self.user, self.password, self.database
);
let (client, connection) = tokio_postgres::connect(&config, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
log::error!("PostgreSQL connection error: {}", e);
}
});
Ok(client)
}
pub async fn fetch_users(&self) -> Result<Vec<PgUser>> {
let client = self.connect().await?;
let rows = client.query(
"SELECT username, password, email, status, home_dir, permissions,
uid, gid, last_login, created_at, updated_at
FROM users
WHERE status = 1 AND deleted_at = 0",
&[]
).await?;
let users = rows
.into_iter()
.map(|row| PgUser {
username: row.get::<_, String>(0),
password_hash: row.get::<_, String>(1),
email: row.get::<_, Option<String>>(2),
status: row.get::<_, i32>(3),
home_dir: row.get::<_, String>(4),
permissions: row.get::<_, String>(5),
uid: row.get::<_, i64>(6),
gid: row.get::<_, i64>(7),
last_login: row.get::<_, i64>(8),
created_at: row.get::<_, i64>(9),
updated_at: row.get::<_, i64>(10),
})
.collect();
Ok(users)
}
pub async fn fetch_groups(&self) -> Result<Vec<PgGroup>> {
let client = self.connect().await?;
let rows = client.query(
"SELECT name, description, created_at, updated_at FROM groups",
&[]
).await?;
let groups = rows
.into_iter()
.map(|row| PgGroup {
name: row.get::<_, String>(0),
description: row.get::<_, Option<String>>(1),
created_at: row.get::<_, i64>(2),
updated_at: row.get::<_, i64>(3),
})
.collect();
Ok(groups)
}
pub async fn fetch_admins(&self) -> Result<Vec<PgAdmin>> {
let client = self.connect().await?;
let rows = client
.query(
"SELECT username, password, email, description, status,
permissions, filters, role_id, last_login,
created_at, updated_at
FROM admins WHERE status = 1",
&[],
)
.await?;
let admins = rows
.into_iter()
.map(|row| PgAdmin {
username: row.get::<_, String>(0),
password_hash: row.get::<_, String>(1),
email: row.get::<_, Option<String>>(2),
description: row.get::<_, Option<String>>(3),
status: row.get::<_, i32>(4),
permissions: row.get::<_, String>(5),
filters: row.get::<_, Option<String>>(6),
role_id: row.get::<_, Option<i32>>(7),
last_login: row.get::<_, i64>(8),
created_at: row.get::<_, i64>(9),
updated_at: row.get::<_, i64>(10),
})
.collect();
Ok(admins)
}
pub async fn fetch_mappings(&self) -> Result<Vec<PgUserGroupMapping>> {
let client = self.connect().await?;
let rows = client.query(
"SELECT u.username, g.name
FROM users_groups_mapping ug
JOIN users u ON ug.user_id = u.id
JOIN groups g ON ug.group_id = g.id
WHERE u.status = 1",
&[]
).await?;
let mappings = rows
.into_iter()
.map(|row| PgUserGroupMapping {
username: row.get::<_, String>(0),
group_name: row.get::<_, String>(1),
})
.collect();
Ok(mappings)
}
pub async fn fetch_admins(&self) -> Result<Vec<PgAdmin>> {
let rows = self.client
.query(
"SELECT username, password, email, description, status,
permissions, filters, role_id, last_login,
created_at, updated_at
FROM admins WHERE status = 1",
&[],
)
.await?;
let admins = rows
.into_iter()
.map(|row| PgAdmin {
username: row.get::<_, String>(0),
password_hash: row.get::<_, String>(1),
email: row.get::<_, Option<String>>(2),
description: row.get::<_, Option<String>>(3),
status: row.get::<_, i32>(4),
permissions: row.get::<_, String>(5),
filters: row.get::<_, Option<String>>(6),
role_id: row.get::<_, Option<i32>>(7),
last_login: row.get::<_, i64>(8),
created_at: row.get::<_, i64>(9),
updated_at: row.get::<_, i64>(10),
})
.collect();
Ok(admins)
}
}
pub struct SftpGoSync {
pg_client: PgClient,
auth_db: crate::sync::AuthDb,
}
impl SftpGoSync {
pub fn new(auth_db_path: &str) -> Result<Self> {
Ok(Self {
pg_client: PgClient::new(),
auth_db: crate::sync::AuthDb::new(auth_db_path)?,
})
}
pub async fn full_sync(&self) -> Result<crate::sync::SyncResult> {
let mut result = crate::sync::SyncResult::default();
result.sync_type = "full".to_string();
result.sync_time = chrono::Utc::now().timestamp();
log::info!("Starting full sync from SFTPGo PostgreSQL");
// 1. Sync users
match self.pg_client.fetch_users().await {
Ok(users) => {
log::info!("Fetched {} users from PostgreSQL", users.len());
for user in users {
match self.auth_db.save_user(&user) {
Ok(_) => result.users_synced += 1,
Err(e) => {
result.users_failed += 1;
result.errors.push(format!("User {} sync failed: {}", user.username, e));
log::error!("Failed to sync user {}: {}", user.username, e);
}
}
}
}
Err(e) => {
log::error!("Failed to fetch users from PostgreSQL: {}", e);
result.errors.push(format!("PG users fetch failed: {}", e));
result.users_failed = 1;
}
}
// 2. Sync groups
match self.pg_client.fetch_groups().await {
Ok(groups) => {
log::info!("Fetched {} groups from PostgreSQL", groups.len());
for group in groups {
match self.auth_db.save_group(&group) {
Ok(_) => result.groups_synced += 1,
Err(e) => {
result.groups_failed += 1;
result.errors.push(format!("Group {} sync failed: {}", group.name, e));
log::error!("Failed to sync group {}: {}", group.name, e);
}
}
}
}
Err(e) => {
log::error!("Failed to fetch groups from PostgreSQL: {}", e);
result.errors.push(format!("PG groups fetch failed: {}", e));
result.groups_failed = 1;
}
}
// 3. Sync mappings
match self.pg_client.fetch_mappings().await {
Ok(mappings) => {
log::info!("Fetched {} mappings from PostgreSQL", mappings.len());
for mapping in mappings {
match self.auth_db.save_mapping(&mapping) {
Ok(_) => result.mappings_synced += 1,
Err(e) => {
result.mappings_failed += 1;
result.errors.push(format!(
"Mapping {}->{} sync failed: {}",
mapping.username, mapping.group_name, e
));
log::error!(
"Failed to sync mapping {}->{}: {}",
mapping.username, mapping.group_name, e
);
}
}
}
}
Err(e) => {
log::error!("Failed to fetch mappings from PostgreSQL: {}", e);
result.errors.push(format!("PG mappings fetch failed: {}", e));
result.mappings_failed = 1;
}
}
// 4. Sync admins
match self.pg_client.connect().await {
Ok(client) => {
match self.pg_client.fetch_admins(&client).await {
Ok(admins) => {
log::info!("Fetched {} admins from PostgreSQL", admins.len());
for admin in admins {
match self.auth_db.sync_admins(vec![admin.clone()]) {
Ok(_) => result.admins_synced += 1,
Err(e) => {
result.admins_failed += 1;
result.errors.push(format!("Admin {} sync failed: {}", admin.username, e));
log::error!("Failed to sync admin {}: {}", admin.username, e);
}
}
}
}
Err(e) => {
log::error!("Failed to fetch admins from PostgreSQL: {}", e);
result.errors.push(format!("PG admins fetch failed: {}", e));
result.admins_failed = 1;
}
}
}
Err(e) => {
log::error!("Failed to connect to PostgreSQL for admins sync: {}", e);
result.errors.push(format!("PG connection failed for admins: {}", e));
result.admins_failed = 1;
}
}
// 5. Determine final status
if result.users_failed > 0 || result.groups_failed > 0 || result.mappings_failed > 0 || result.admins_failed > 0 {
if result.users_synced > 0 || result.groups_synced > 0 || result.mappings_synced > 0 || result.admins_synced > 0 {
result.status = "partial_success".to_string();
} else {
result.status = "cached".to_string();
}
} else {
result.status = "success".to_string();
}
// 6. Save sync log
self.auth_db.save_sync_log(&result)?;
log::info!(
"Sync completed: users={}, groups={}, mappings={}, admins={}, status={}",
result.users_synced,
result.groups_synced,
result.mappings_synced,
result.admins_synced,
result.status
);
Ok(result)
}
}