diff --git a/Cargo.lock b/Cargo.lock index 792740d..a59a813 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,12 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "adler32" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" + [[package]] name = "aead" version = "0.5.2" @@ -279,6 +285,7 @@ dependencies = [ "matchit", "memchr", "mime", + "multer", "percent-encoding", "pin-project-lite", "rustversion", @@ -348,6 +355,26 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" +[[package]] +name = "bincode" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" +dependencies = [ + "bincode_derive", + "serde", + "unty", +] + +[[package]] +name = "bincode_derive" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" +dependencies = [ + "virtue", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -441,6 +468,25 @@ dependencies = [ "rustversion", ] +[[package]] +name = "cbindgen" +version = "0.29.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "befbfd072a8e81c02f8c507aefce431fe5e7d051f83d48a23ffc9b9fe5a11799" +dependencies = [ + "clap", + "heck 0.5.0", + "indexmap 2.13.0", + "log", + "proc-macro2", + "quote", + "serde", + "serde_json", + "syn 2.0.117", + "tempfile", + "toml", +] + [[package]] name = "cc" version = "1.2.57" @@ -448,9 +494,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423" dependencies = [ "find-msvc-tools", + "jobserver", + "libc", "shlex", ] +[[package]] +name = "cedarwood" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d910bedd62c24733263d0bed247460853c9d22e8956bd4cd964302095e04e90" +dependencies = [ + "smallvec", +] + [[package]] name = "cfg-if" version = "1.0.4" @@ -608,6 +665,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "cpufeatures" version = "0.2.17" @@ -823,6 +889,12 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "dary_heap" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06d2e3287df1c007e74221c49ca10a95d557349e54b3a75dc2fb14712c751f04" + [[package]] name = "data-encoding" version = "2.10.0" @@ -1057,6 +1129,37 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "ferrous-opencc" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "800f9a39ad26b200b08673c059c2dbf78d1f63901a873e9ac425ba471e690410" +dependencies = [ + "anyhow", + "bincode", + "cbindgen", + "ferrous-opencc-compiler", + "fst", + "phf", + "phf_codegen", + "serde", + "serde_json", + "thiserror 2.0.18", + "zstd", +] + +[[package]] +name = "ferrous-opencc-compiler" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ed87f42a19a234e791d6e769928819382c3d210da2c9049f832a1ae772d7082" +dependencies = [ + "anyhow", + "bincode", + "fst", + "zstd", +] + [[package]] name = "filetime" version = "0.2.27" @@ -1107,6 +1210,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.3.2" @@ -1140,6 +1249,12 @@ dependencies = [ "libc", ] +[[package]] +name = "fst" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ab85b9b05e3978cc9a9cf8fea7f01b494e1a09ed3037e16ba39edc7a29eb61a" + [[package]] name = "funty" version = "2.0.0" @@ -1338,7 +1453,7 @@ checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ "allocator-api2", "equivalent", - "foldhash", + "foldhash 0.1.5", ] [[package]] @@ -1346,6 +1461,11 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash 0.2.0", +] [[package]] name = "hashlink" @@ -1698,6 +1818,39 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "include-flate" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a05fb00d9abc625268e0573a519506b264a7d6965de09bac13201bfb44e723d" +dependencies = [ + "include-flate-codegen", + "include-flate-compress", +] + +[[package]] +name = "include-flate-codegen" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92c3c319a7527668538a8530c541e74e881e94c4f41e1425622d0a41c16468af" +dependencies = [ + "include-flate-compress", + "proc-macro-error2", + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "include-flate-compress" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed0bd9ea81b94169d61c5a397e9faef02153d3711fc62d3270bcde3ac85380d9" +dependencies = [ + "libflate", + "zstd", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -1829,6 +1982,29 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" +[[package]] +name = "jieba-macros" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "348294e44ee7e3c42685da656490f8febc7359632544019621588902216da95c" +dependencies = [ + "phf_codegen", +] + +[[package]] +name = "jieba-rs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "766bd7012aa5ba49411ebdf4e93bddd59b182d2918e085d58dec5bb9b54b7105" +dependencies = [ + "cedarwood", + "include-flate", + "jieba-macros", + "phf", + "regex", + "rustc-hash", +] + [[package]] name = "jiff" version = "0.2.23" @@ -1853,6 +2029,16 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "jobserver" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" +dependencies = [ + "getrandom 0.3.4", + "libc", +] + [[package]] name = "js-sys" version = "0.3.91" @@ -1904,6 +2090,30 @@ version = "0.2.183" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" +[[package]] +name = "libflate" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3248b8d211bd23a104a42d81b4fa8bb8ac4a3b75e7a43d85d2c9ccb6179cd74" +dependencies = [ + "adler32", + "core2", + "crc32fast", + "dary_heap", + "libflate_lz77", +] + +[[package]] +name = "libflate_lz77" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a599cb10a9cd92b1300debcef28da8f70b935ec937f44fcd1b70a7c986a11c5c" +dependencies = [ + "core2", + "hashbrown 0.16.1", + "rle-decode-fast", +] + [[package]] name = "libm" version = "0.2.16" @@ -2116,8 +2326,10 @@ dependencies = [ "crossterm", "dotenv", "env_logger", + "ferrous-opencc", "futures-util", "hex", + "jieba-rs", "libc", "md5", "moka", @@ -2190,6 +2402,23 @@ dependencies = [ "webpki-roots 0.25.4", ] +[[package]] +name = "multer" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http", + "httparse", + "memchr", + "mime", + "spin", + "version_check", +] + [[package]] name = "native-tls" version = "0.2.18" @@ -2440,6 +2669,59 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "phf" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1562dc717473dbaa4c1f85a36410e03c047b2e7df7f45ee938fbef64ae7fadf" +dependencies = [ + "phf_macros", + "phf_shared", + "serde", +] + +[[package]] +name = "phf_codegen" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49aa7f9d80421bca176ca8dbfebe668cc7a2684708594ec9f3c0db0805d5d6e1" +dependencies = [ + "phf_generator", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135ace3a761e564ec88c03a77317a7c6b80bb7f7135ef2544dbe054243b89737" +dependencies = [ + "fastrand", + "phf_shared", +] + +[[package]] +name = "phf_macros" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "812f032b54b1e759ccd5f8b6677695d5268c588701effba24601f6932f8269ef" +dependencies = [ + "phf_generator", + "phf_shared", + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "phf_shared" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e57fef6bc5981e38c2ce2d63bfa546861309f875b8a75f092d1d54ae2d64f266" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.1.11" @@ -2599,6 +2881,28 @@ dependencies = [ "version_check", ] +[[package]] +name = "proc-macro-error-attr2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "proc-macro-error2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" +dependencies = [ + "proc-macro-error-attr2", + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "proc-macro2" version = "1.0.106" @@ -2966,6 +3270,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rle-decode-fast" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" + [[package]] name = "rsa" version = "0.9.10" @@ -3323,6 +3633,15 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_spanned" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876ac351060d4f882bb1032b6369eb0aef79ad9df1ea8bc404874d8cc3d0cd98" +dependencies = [ + "serde_core", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -3458,6 +3777,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" +[[package]] +name = "siphasher" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" + [[package]] name = "slab" version = "0.4.12" @@ -4059,6 +4384,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.9.12+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf92845e79fc2e2def6a5d828f0801e29a2f8acc037becc5ab08595c7d5e9863" +dependencies = [ + "indexmap 2.13.0", + "serde_core", + "serde_spanned", + "toml_datetime 0.7.5+spec-1.1.0", + "toml_parser", + "toml_writer", + "winnow 0.7.15", +] + +[[package]] +name = "toml_datetime" +version = "0.7.5+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347" +dependencies = [ + "serde_core", +] + [[package]] name = "toml_datetime" version = "1.0.1+spec-1.1.0" @@ -4075,9 +4424,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ca1a40644a28bce036923f6a431df0b34236949d111cc07cb6dca830c9ef2e1" dependencies = [ "indexmap 2.13.0", - "toml_datetime", + "toml_datetime 1.0.1+spec-1.1.0", "toml_parser", - "winnow", + "winnow 1.0.0", ] [[package]] @@ -4086,9 +4435,15 @@ version = "1.0.10+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7df25b4befd31c4816df190124375d5a20c6b6921e2cad937316de3fccd63420" dependencies = [ - "winnow", + "winnow 1.0.0", ] +[[package]] +name = "toml_writer" +version = "1.1.0+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d282ade6016312faf3e41e57ebbba0c073e4056dab1232ab1cb624199648f8ed" + [[package]] name = "tonic" version = "0.12.3" @@ -4393,6 +4748,12 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "unty" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" + [[package]] name = "url" version = "2.5.8" @@ -4491,6 +4852,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "virtue" +version = "0.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" + [[package]] name = "walkdir" version = "2.5.0" @@ -5029,6 +5396,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" +[[package]] +name = "winnow" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945" + [[package]] name = "winnow" version = "1.0.0" @@ -5281,3 +5654,31 @@ name = "zmij" version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index 2bc05c2..48edaef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,10 +33,14 @@ sha2 = "0.10" hex = "0.4" uuid = { version = "1.0", features = ["v4"] } -# Security -subtle = "2.5" -aes-gcm = "0.10" -base64 = "0.22" + # Security + subtle = "2.5" + aes-gcm = "0.10" + base64 = "0.22" + + # Text processing + jieba-rs = "0.8.1" + ferrous-opencc = { version = "0.3.1", features = ["s2t-conversion", "t2s-conversion"] } # Cache moka = { version = "0.12", features = ["future"] } @@ -50,7 +54,7 @@ qdrant-client = "1.7" reqwest = { version = "0.12", features = ["json"] } # HTTP Server -axum = "0.7" +axum = { version = "0.7", features = ["multipart"] } tower = "0.4" # API Documentation @@ -98,3 +102,11 @@ path = "src/playground.rs" [[bin]] name = "fix_chunks" path = "src/bin/fix_chunks.rs" + +[[bin]] +name = "migrate_chinese_text" +path = "src/bin/migrate_chinese_text.rs" + +[[bin]] +name = "test_bm25_simple" +path = "src/bin/test_bm25_simple.rs" diff --git a/src/core/config.rs b/src/core/config.rs index 089f8e3..6684422 100644 --- a/src/core/config.rs +++ b/src/core/config.rs @@ -78,6 +78,15 @@ pub static SERVER_PORT: Lazy = Lazy::new(|| { pub static REDIS_KEY_PREFIX: Lazy = Lazy::new(|| env::var("MOMENTRY_REDIS_PREFIX").unwrap_or_else(|_| "momentry:".to_string())); +pub static DATABASE_SCHEMA: Lazy = + Lazy::new(|| env::var("DATABASE_SCHEMA").unwrap_or_else(|_| "public".to_string())); + +pub static MONGODB_DATABASE: Lazy = + Lazy::new(|| env::var("MONGODB_DATABASE").unwrap_or_else(|_| "momentry".to_string())); + +pub static QDRANT_COLLECTION: Lazy = + Lazy::new(|| env::var("QDRANT_COLLECTION").unwrap_or_else(|_| "momentry_rule1".to_string())); + pub mod processor { use super::*; diff --git a/src/core/db/mod.rs b/src/core/db/mod.rs index 1aba44e..c2d0bfb 100644 --- a/src/core/db/mod.rs +++ b/src/core/db/mod.rs @@ -1,6 +1,8 @@ use anyhow::Result; use async_trait::async_trait; +pub mod schema; + use crate::core::chunk::Chunk; #[derive(Debug, Clone)] diff --git a/src/core/db/postgres_db.rs b/src/core/db/postgres_db.rs index 2b2eae4..e09b850 100644 --- a/src/core/db/postgres_db.rs +++ b/src/core/db/postgres_db.rs @@ -5,8 +5,13 @@ use sqlx::{postgres::PgPoolOptions, PgPool, Row}; use std::sync::Arc; use tokio::sync::RwLock; -use super::{Database, QdrantDb}; +use super::{schema, Database, QdrantDb}; use crate::core::chunk::types::{Chunk, ChunkRule, ChunkType}; +use crate::core::text::synonym::normalize_chinese_query; +use crate::core::text::{ + global_synonym_expander, + tokenizer::{contains_chinese, tokenize_chinese_text}, +}; #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct StorageStatus { @@ -453,18 +458,41 @@ impl PostgresDb { acquire_timeout_secs ); + let schema = crate::core::config::DATABASE_SCHEMA.as_str(); + tracing::info!("Database schema configuration: {}", schema); + let pool_options = PgPoolOptions::new() .max_connections(max_connections) - .acquire_timeout(std::time::Duration::from_secs(acquire_timeout_secs)); + .acquire_timeout(std::time::Duration::from_secs(acquire_timeout_secs)) + .after_connect(move |conn, _| { + let schema = schema.to_string(); + tracing::debug!("after_connect: setting search_path to {}", schema); + Box::pin(async move { + if schema != "public" { + sqlx::query(&format!("SET search_path TO {}", schema)) + .execute(conn) + .await?; + } + Ok(()) + }) + }); let pool = pool_options.connect(database_url).await?; + if schema != "public" { + tracing::info!("Creating PostgreSQL schema if not exists: {}", schema); + sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {}", schema)) + .execute(&pool) + .await?; + } + let db = Self { pool, cache: Arc::new(RwLock::new(PostgresCache::default())), }; db.init_schema().await?; + Ok(db) } @@ -474,25 +502,29 @@ impl PostgresDb { } pub async fn register_video(&self, record: &VideoRecord) -> Result { + let table = schema::table_name("videos"); let result = sqlx::query( - r#" - INSERT INTO videos (uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, status, user_id, job_id) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, TRUE, $9, $10, $11) - ON CONFLICT (uuid) DO UPDATE SET - file_path = EXCLUDED.file_path, - file_name = EXCLUDED.file_name, - duration = EXCLUDED.duration, - width = EXCLUDED.width, - height = EXCLUDED.height, - fps = EXCLUDED.fps, - probe_json = EXCLUDED.probe_json, - fs_video = TRUE, - status = COALESCE(EXCLUDED.status, videos.status), - user_id = COALESCE(EXCLUDED.user_id, videos.user_id), - job_id = COALESCE(EXCLUDED.job_id, videos.job_id), - updated_at = CURRENT_TIMESTAMP - RETURNING id::bigint - "# + &format!( + r#" + INSERT INTO {} (uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, status, user_id, job_id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, TRUE, $9, $10, $11) + ON CONFLICT (uuid) DO UPDATE SET + file_path = EXCLUDED.file_path, + file_name = EXCLUDED.file_name, + duration = EXCLUDED.duration, + width = EXCLUDED.width, + height = EXCLUDED.height, + fps = EXCLUDED.fps, + probe_json = EXCLUDED.probe_json, + fs_video = TRUE, + status = COALESCE(EXCLUDED.status, {}.status), + user_id = COALESCE(EXCLUDED.user_id, {}.user_id), + job_id = COALESCE(EXCLUDED.job_id, {}.job_id), + updated_at = CURRENT_TIMESTAMP + RETURNING id::bigint + "#, + table, table, table, table + ) ) .bind(&record.uuid) .bind(&record.file_path) @@ -528,8 +560,12 @@ impl PostgresDb { } } + let table = schema::table_name("videos"); let result = sqlx::query_as::<_, VideoRow>( - "SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, fs_json, psql_chunk, pobject_chunk, mobject_chunk, pvector_chunk, qvector_chunk, status, user_id, job_id FROM videos WHERE uuid = $1" + &format!( + "SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, fs_json, psql_chunk, pobject_chunk, mobject_chunk, pvector_chunk, qvector_chunk, status, user_id, job_id FROM {} WHERE uuid = $1", + table + ) ) .bind(uuid) .fetch_optional(&self.pool) @@ -549,8 +585,12 @@ impl PostgresDb { } pub async fn list_videos(&self) -> Result> { + let table = schema::table_name("videos"); let rows = sqlx::query_as::<_, VideoRow>( - "SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, fs_json, psql_chunk, pobject_chunk, mobject_chunk, pvector_chunk, qvector_chunk, status, user_id, job_id FROM videos ORDER BY id DESC" + &format!( + "SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, fs_json, psql_chunk, pobject_chunk, mobject_chunk, pvector_chunk, qvector_chunk, status, user_id, job_id FROM {} ORDER BY id DESC", + table + ) ) .fetch_all(&self.pool) .await?; @@ -572,9 +612,10 @@ impl PostgresDb { _ => return Err(anyhow::anyhow!("Invalid storage field: {}", field)), }; + let table = schema::table_name("videos"); sqlx::query(&format!( - "UPDATE videos SET {} = $1, updated_at = CURRENT_TIMESTAMP WHERE uuid = $2", - column + "UPDATE {} SET {} = $1, updated_at = CURRENT_TIMESTAMP WHERE uuid = $2", + table, column )) .bind(value) .bind(uuid) @@ -593,22 +634,30 @@ impl PostgresDb { let tx = self.pool.begin().await?; - sqlx::query("DELETE FROM chunk_vectors WHERE uuid = $1") + let chunk_vectors = schema::table_name("chunk_vectors"); + let chunks = schema::table_name("chunks"); + let processor_results = schema::table_name("processor_results"); + let videos = schema::table_name("videos"); + + sqlx::query(&format!("DELETE FROM {} WHERE uuid = $1", chunk_vectors)) .bind(uuid) .execute(&self.pool) .await?; - sqlx::query("DELETE FROM chunks WHERE uuid = $1") + sqlx::query(&format!("DELETE FROM {} WHERE uuid = $1", chunks)) .bind(uuid) .execute(&self.pool) .await?; - sqlx::query("DELETE FROM processor_results WHERE video_id IN (SELECT id FROM videos WHERE uuid = $1)") - .bind(uuid) - .execute(&self.pool) - .await?; + sqlx::query(&format!( + "DELETE FROM {} WHERE video_id IN (SELECT id FROM {} WHERE uuid = $1)", + processor_results, videos + )) + .bind(uuid) + .execute(&self.pool) + .await?; - sqlx::query("DELETE FROM videos WHERE uuid = $1") + sqlx::query(&format!("DELETE FROM {} WHERE uuid = $1", videos)) .bind(uuid) .execute(&self.pool) .await?; @@ -631,16 +680,19 @@ impl PostgresDb { } pub async fn get_chunk_count(&self, uuid: &str) -> Result<(i64, i64)> { - let sentence_count: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM chunks WHERE uuid = $1 AND chunk_type = 'sentence'", - ) + let chunks = schema::table_name("chunks"); + let sentence_count: i64 = sqlx::query_scalar(&format!( + "SELECT COUNT(*) FROM {} WHERE uuid = $1 AND chunk_type = 'sentence'", + chunks + )) .bind(uuid) .fetch_one(&self.pool) .await?; - let time_count: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM chunks WHERE uuid = $1 AND chunk_type = 'time_based'", - ) + let time_count: i64 = sqlx::query_scalar(&format!( + "SELECT COUNT(*) FROM {} WHERE uuid = $1 AND chunk_type = 'time_based'", + chunks + )) .bind(uuid) .fetch_one(&self.pool) .await?; @@ -649,10 +701,14 @@ impl PostgresDb { } pub async fn get_vector_count(&self, uuid: &str) -> Result { - let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM chunk_vectors WHERE uuid = $1") - .bind(uuid) - .fetch_one(&self.pool) - .await?; + let chunk_vectors = schema::table_name("chunk_vectors"); + let count: i64 = sqlx::query_scalar(&format!( + "SELECT COUNT(*) FROM {} WHERE uuid = $1", + chunk_vectors + )) + .bind(uuid) + .fetch_one(&self.pool) + .await?; Ok(count) } @@ -662,13 +718,17 @@ impl PostgresDb { uuid: &str, video_path: Option<&str>, ) -> Result { + let table = schema::table_name("monitor_jobs"); let row = sqlx::query( - r#" - INSERT INTO monitor_jobs (uuid, video_path, status) - VALUES ($1, $2, 'pending') - ON CONFLICT DO NOTHING - RETURNING id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at, updated_at, created_at - "# + &format!( + r#" + INSERT INTO {} (uuid, video_path, status) + VALUES ($1, $2, 'pending') + ON CONFLICT DO NOTHING + RETURNING id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at, updated_at, created_at + "#, + table + ) ) .bind(uuid) .bind(video_path) @@ -696,13 +756,15 @@ impl PostgresDb { } pub async fn update_video_job_id(&self, video_id: i64, job_id: i32) -> Result<()> { - sqlx::query( + let videos = schema::table_name("videos"); + sqlx::query(&format!( r#" - UPDATE videos - SET job_id = $1, updated_at = CURRENT_TIMESTAMP - WHERE id = $2 - "#, - ) + UPDATE {} + SET job_id = $1, updated_at = CURRENT_TIMESTAMP + WHERE id = $2 + "#, + videos + )) .bind(job_id) .bind(video_id) .execute(&self.pool) @@ -712,13 +774,15 @@ impl PostgresDb { } pub async fn update_monitor_job_video_id(&self, job_id: i32, video_id: i64) -> Result<()> { - sqlx::query( + let monitor_jobs = schema::table_name("monitor_jobs"); + sqlx::query(&format!( r#" - UPDATE monitor_jobs - SET video_id = $1, updated_at = CURRENT_TIMESTAMP - WHERE id = $2 - "#, - ) + UPDATE {} + SET video_id = $1, updated_at = CURRENT_TIMESTAMP + WHERE id = $2 + "#, + monitor_jobs + )) .bind(video_id) .bind(job_id) .execute(&self.pool) @@ -728,11 +792,15 @@ impl PostgresDb { } pub async fn get_monitor_job_by_uuid(&self, uuid: &str) -> Result> { + let table = schema::table_name("monitor_jobs"); let row = sqlx::query( - r#" - SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at, updated_at, created_at - FROM monitor_jobs WHERE uuid = $1 - "# + &format!( + r#" + SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at, updated_at, created_at + FROM {} WHERE uuid = $1 + "#, + table + ) ) .bind(uuid) .fetch_optional(&self.pool) @@ -766,11 +834,15 @@ impl PostgresDb { &self, status: MonitorJobStatus, ) -> Result> { + let table = schema::table_name("monitor_jobs"); let rows = sqlx::query( - r#" - SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at, updated_at, created_at - FROM monitor_jobs WHERE status = $1 ORDER BY created_at DESC - "# + &format!( + r#" + SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at, updated_at, created_at + FROM {} WHERE status = $1 ORDER BY created_at DESC + "#, + table + ) ) .bind(status.as_str()) .fetch_all(&self.pool) @@ -814,9 +886,10 @@ impl PostgresDb { "" }; + let table = schema::table_name("monitor_jobs"); sqlx::query(&format!( - "UPDATE monitor_jobs SET status = $1, updated_at = CURRENT_TIMESTAMP{} WHERE uuid = $2", - started_at_clause + "UPDATE {} SET status = $1, updated_at = CURRENT_TIMESTAMP{} WHERE uuid = $2", + table, started_at_clause )) .bind(status.as_str()) .bind(uuid) @@ -833,16 +906,18 @@ impl PostgresDb { progress_current: i32, progress_total: i32, ) -> Result<()> { - sqlx::query( + let table = schema::table_name("monitor_jobs"); + sqlx::query(&format!( r#" - UPDATE monitor_jobs - SET current_processor = COALESCE($3, current_processor), - progress_current = $4, - progress_total = $5, - updated_at = CURRENT_TIMESTAMP - WHERE uuid = $1 - "#, - ) + UPDATE {} + SET current_processor = COALESCE($3, current_processor), + progress_current = $4, + progress_total = $5, + updated_at = CURRENT_TIMESTAMP + WHERE uuid = $1 + "#, + table + )) .bind(uuid) .bind(uuid) .bind(current_processor) @@ -855,15 +930,17 @@ impl PostgresDb { } pub async fn update_monitor_job_error(&self, uuid: &str, error: &str) -> Result<()> { - sqlx::query( + let table = schema::table_name("monitor_jobs"); + sqlx::query(&format!( r#" - UPDATE monitor_jobs - SET error_count = error_count + 1, - last_error = $2, - updated_at = CURRENT_TIMESTAMP - WHERE uuid = $1 - "#, - ) + UPDATE {} + SET error_count = error_count + 1, + last_error = $2, + updated_at = CURRENT_TIMESTAMP + WHERE uuid = $1 + "#, + table + )) .bind(uuid) .bind(error) .execute(&self.pool) @@ -873,7 +950,8 @@ impl PostgresDb { } pub async fn delete_monitor_job(&self, uuid: &str) -> Result { - let result = sqlx::query("DELETE FROM monitor_jobs WHERE uuid = $1") + let table = schema::table_name("monitor_jobs"); + let result = sqlx::query(&format!("DELETE FROM {} WHERE uuid = $1", table)) .bind(uuid) .execute(&self.pool) .await?; @@ -882,25 +960,34 @@ impl PostgresDb { } pub async fn get_monitor_job_stats(&self) -> Result { - let pending: i64 = - sqlx::query_scalar("SELECT COUNT(*) FROM monitor_jobs WHERE status = 'pending'") - .fetch_one(&self.pool) - .await?; + let table = schema::table_name("monitor_jobs"); + let pending: i64 = sqlx::query_scalar(&format!( + "SELECT COUNT(*) FROM {} WHERE status = 'pending'", + table + )) + .fetch_one(&self.pool) + .await?; - let running: i64 = - sqlx::query_scalar("SELECT COUNT(*) FROM monitor_jobs WHERE status = 'running'") - .fetch_one(&self.pool) - .await?; + let running: i64 = sqlx::query_scalar(&format!( + "SELECT COUNT(*) FROM {} WHERE status = 'running'", + table + )) + .fetch_one(&self.pool) + .await?; - let completed: i64 = - sqlx::query_scalar("SELECT COUNT(*) FROM monitor_jobs WHERE status = 'completed'") - .fetch_one(&self.pool) - .await?; + let completed: i64 = sqlx::query_scalar(&format!( + "SELECT COUNT(*) FROM {} WHERE status = 'completed'", + table + )) + .fetch_one(&self.pool) + .await?; - let failed: i64 = - sqlx::query_scalar("SELECT COUNT(*) FROM monitor_jobs WHERE status = 'failed'") - .fetch_one(&self.pool) - .await?; + let failed: i64 = sqlx::query_scalar(&format!( + "SELECT COUNT(*) FROM {} WHERE status = 'failed'", + table + )) + .fetch_one(&self.pool) + .await?; Ok(MonitorJobStats { pending: pending as i32, @@ -911,12 +998,16 @@ impl PostgresDb { } pub async fn create_api_key(&self, config: CreateApiKeyConfig<'_>) -> Result { + let table = schema::table_name("api_keys"); let result = sqlx::query( - r#" - INSERT INTO api_keys (key_id, key_hash, key_prefix, name, key_type, user_id, service_name, permissions, expires_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb, $9) - RETURNING id - "#, + &format!( + r#" + INSERT INTO {} (key_id, key_hash, key_prefix, name, key_type, user_id, service_name, permissions, expires_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb, $9) + RETURNING id + "#, + table + ), ) .bind(config.key_id) .bind(config.key_hash) @@ -935,14 +1026,16 @@ impl PostgresDb { } pub async fn get_api_key_by_hash(&self, key_hash: &str) -> Result> { - let result = sqlx::query_as::<_, ApiKeyRecord>( + let table = schema::table_name("api_keys"); + let result = sqlx::query_as::<_, ApiKeyRecord>(&format!( r#" - SELECT id, key_id, key_hash, key_prefix, name, key_type, user_id, service_name, - permissions, expires_at, last_used_at, last_used_ip, usage_count, status, - rotation_required, rotation_reason, grace_period_end, created_at, updated_at - FROM api_keys WHERE key_hash = $1 - "#, - ) + SELECT id, key_id, key_hash, key_prefix, name, key_type, user_id, service_name, + permissions, expires_at, last_used_at, last_used_ip, usage_count, status, + rotation_required, rotation_reason, grace_period_end, created_at, updated_at + FROM {} WHERE key_hash = $1 + "#, + table + )) .bind(key_hash) .fetch_optional(&self.pool) .await?; @@ -951,14 +1044,16 @@ impl PostgresDb { } pub async fn get_api_key_by_key_id(&self, key_id: &str) -> Result> { - let result = sqlx::query_as::<_, ApiKeyRecord>( + let table = schema::table_name("api_keys"); + let result = sqlx::query_as::<_, ApiKeyRecord>(&format!( r#" - SELECT id, key_id, key_hash, key_prefix, name, key_type, user_id, service_name, - permissions, expires_at, last_used_at, last_used_ip, usage_count, status, - rotation_required, rotation_reason, grace_period_end, created_at, updated_at - FROM api_keys WHERE key_id = $1 - "#, - ) + SELECT id, key_id, key_hash, key_prefix, name, key_type, user_id, service_name, + permissions, expires_at, last_used_at, last_used_ip, usage_count, status, + rotation_required, rotation_reason, grace_period_end, created_at, updated_at + FROM {} WHERE key_id = $1 + "#, + table + )) .bind(key_id) .fetch_optional(&self.pool) .await?; @@ -967,14 +1062,16 @@ impl PostgresDb { } pub async fn list_api_keys(&self) -> Result> { - let results = sqlx::query_as::<_, ApiKeyRecord>( + let table = schema::table_name("api_keys"); + let results = sqlx::query_as::<_, ApiKeyRecord>(&format!( r#" - SELECT id, key_id, key_hash, key_prefix, name, key_type, user_id, service_name, - permissions, expires_at, last_used_at, last_used_ip, usage_count, status, - rotation_required, rotation_reason, grace_period_end, created_at, updated_at - FROM api_keys ORDER BY created_at DESC - "#, - ) + SELECT id, key_id, key_hash, key_prefix, name, key_type, user_id, service_name, + permissions, expires_at, last_used_at, last_used_ip, usage_count, status, + rotation_required, rotation_reason, grace_period_end, created_at, updated_at + FROM {} ORDER BY created_at DESC + "#, + table + )) .fetch_all(&self.pool) .await?; @@ -982,16 +1079,18 @@ impl PostgresDb { } pub async fn update_api_key_usage(&self, key_id: &str, ip_address: Option<&str>) -> Result<()> { - sqlx::query( + let table = schema::table_name("api_keys"); + sqlx::query(&format!( r#" - UPDATE api_keys - SET last_used_at = CURRENT_TIMESTAMP, - last_used_ip = COALESCE($2, last_used_ip), - usage_count = usage_count + 1, - updated_at = CURRENT_TIMESTAMP - WHERE key_id = $1 - "#, - ) + UPDATE {} + SET last_used_at = CURRENT_TIMESTAMP, + last_used_ip = COALESCE($2, last_used_ip), + usage_count = usage_count + 1, + updated_at = CURRENT_TIMESTAMP + WHERE key_id = $1 + "#, + table + )) .bind(key_id) .bind(ip_address) .execute(&self.pool) @@ -1006,16 +1105,18 @@ impl PostgresDb { reason: &str, grace_period_end: chrono::DateTime, ) -> Result<()> { - sqlx::query( + let table = schema::table_name("api_keys"); + sqlx::query(&format!( r#" - UPDATE api_keys - SET rotation_required = TRUE, - rotation_reason = $2, - grace_period_end = $3, - updated_at = CURRENT_TIMESTAMP - WHERE key_id = $1 - "#, - ) + UPDATE {} + SET rotation_required = TRUE, + rotation_reason = $2, + grace_period_end = $3, + updated_at = CURRENT_TIMESTAMP + WHERE key_id = $1 + "#, + table + )) .bind(key_id) .bind(reason) .bind(grace_period_end) @@ -1144,12 +1245,14 @@ impl PostgresDb { &self, gitea_user: &str, ) -> Result> { - let results = sqlx::query_as::<_, GiteaTokenRecord>( + let table = schema::table_name("gitea_tokens"); + let results = sqlx::query_as::<_, GiteaTokenRecord>(&format!( r#" SELECT id, gitea_token_id, gitea_user, token_name, token_last_eight, scopes, api_key_id, last_verified, created_at - FROM gitea_tokens WHERE gitea_user = $1 ORDER BY created_at DESC + FROM {} WHERE gitea_user = $1 ORDER BY created_at DESC "#, - ) + table + )) .bind(gitea_user) .fetch_all(&self.pool) .await?; @@ -1162,12 +1265,14 @@ impl PostgresDb { gitea_user: &str, token_name: &str, ) -> Result> { - let result = sqlx::query_as::<_, GiteaTokenRecord>( + let table = schema::table_name("gitea_tokens"); + let result = sqlx::query_as::<_, GiteaTokenRecord>(&format!( r#" SELECT id, gitea_token_id, gitea_user, token_name, token_last_eight, scopes, api_key_id, last_verified, created_at - FROM gitea_tokens WHERE gitea_user = $1 AND token_name = $2 + FROM {} WHERE gitea_user = $1 AND token_name = $2 "#, - ) + table + )) .bind(gitea_user) .bind(token_name) .fetch_optional(&self.pool) @@ -1177,11 +1282,15 @@ impl PostgresDb { } pub async fn delete_gitea_token(&self, gitea_user: &str, token_name: &str) -> Result<()> { - sqlx::query("DELETE FROM gitea_tokens WHERE gitea_user = $1 AND token_name = $2") - .bind(gitea_user) - .bind(token_name) - .execute(&self.pool) - .await?; + let table = schema::table_name("gitea_tokens"); + sqlx::query(&format!( + "DELETE FROM {} WHERE gitea_user = $1 AND token_name = $2", + table + )) + .bind(gitea_user) + .bind(token_name) + .execute(&self.pool) + .await?; Ok(()) } @@ -1191,13 +1300,15 @@ impl PostgresDb { gitea_user: &str, token_name: &str, ) -> Result<()> { - sqlx::query( + let table = schema::table_name("gitea_tokens"); + sqlx::query(&format!( r#" - UPDATE gitea_tokens + UPDATE {} SET last_verified = CURRENT_TIMESTAMP WHERE gitea_user = $1 AND token_name = $2 "#, - ) + table + )) .bind(gitea_user) .bind(token_name) .execute(&self.pool) @@ -1214,13 +1325,15 @@ impl PostgresDb { momentry_api_key_id: Option<&str>, expires_at: Option>, ) -> Result { - let result = sqlx::query( + let table = schema::table_name("n8n_api_keys"); + let result = sqlx::query(&format!( r#" - INSERT INTO n8n_api_keys (n8n_key_id, label, api_key_last_eight, momentry_api_key_id, expires_at) + INSERT INTO {} (n8n_key_id, label, api_key_last_eight, momentry_api_key_id, expires_at) VALUES ($1, $2, $3, $4, $5) RETURNING id "#, - ) + table + )) .bind(n8n_key_id) .bind(label) .bind(api_key_last_eight) @@ -1234,12 +1347,14 @@ impl PostgresDb { } pub async fn get_n8n_api_keys(&self) -> Result> { - let results = sqlx::query_as::<_, N8nApiKeyRecord>( + let table = schema::table_name("n8n_api_keys"); + let results = sqlx::query_as::<_, N8nApiKeyRecord>(&format!( r#" SELECT id, n8n_key_id, label, api_key_last_eight, momentry_api_key_id, expires_at, last_verified, created_at - FROM n8n_api_keys ORDER BY created_at DESC + FROM {} ORDER BY created_at DESC "#, - ) + table + )) .fetch_all(&self.pool) .await?; @@ -1247,12 +1362,14 @@ impl PostgresDb { } pub async fn get_n8n_api_key_by_label(&self, label: &str) -> Result> { - let result = sqlx::query_as::<_, N8nApiKeyRecord>( + let table = schema::table_name("n8n_api_keys"); + let result = sqlx::query_as::<_, N8nApiKeyRecord>(&format!( r#" SELECT id, n8n_key_id, label, api_key_last_eight, momentry_api_key_id, expires_at, last_verified, created_at - FROM n8n_api_keys WHERE label = $1 + FROM {} WHERE label = $1 "#, - ) + table + )) .bind(label) .fetch_optional(&self.pool) .await?; @@ -1261,7 +1378,8 @@ impl PostgresDb { } pub async fn delete_n8n_api_key(&self, label: &str) -> Result<()> { - sqlx::query("DELETE FROM n8n_api_keys WHERE label = $1") + let table = schema::table_name("n8n_api_keys"); + sqlx::query(&format!("DELETE FROM {} WHERE label = $1", table)) .bind(label) .execute(&self.pool) .await?; @@ -1270,13 +1388,15 @@ impl PostgresDb { } pub async fn update_n8n_api_key_verification(&self, label: &str) -> Result<()> { - sqlx::query( + let table = schema::table_name("n8n_api_keys"); + sqlx::query(&format!( r#" - UPDATE n8n_api_keys + UPDATE {} SET last_verified = CURRENT_TIMESTAMP WHERE label = $1 "#, - ) + table + )) .bind(label) .execute(&self.pool) .await?; @@ -1759,14 +1879,36 @@ impl PostgresDb { } pub async fn store_chunk(&self, chunk: &Chunk) -> Result<()> { + let table = schema::table_name("chunks"); let content_with_rule = serde_json::json!({ "rule": chunk.rule.as_str(), "data": chunk.content }); - sqlx::query( + // 獲取文本內容:優先使用 chunk.text_content,否則從 content 中提取 + let raw_text = chunk.text_content.as_deref().unwrap_or_else(|| { + // 從 content 中提取文本(支持中文和英文格式) + chunk + .content + .get("data") + .and_then(|data| data.get("text")) + .and_then(|v| v.as_str()) + .or_else(|| chunk.content.get("text").and_then(|v| v.as_str())) + .unwrap_or("") + }); + + // 對中文文本進行分詞 + let tokenized_text = if raw_text.is_empty() { + None + } else { + Some(crate::core::text::tokenizer::tokenize_chinese_text( + raw_text, + )) + }; + + sqlx::query(&format!( r#" - INSERT INTO chunks (file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids) + INSERT INTO {} (file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13::jsonb, $14, $15, $16, $17, $18) ON CONFLICT (uuid, chunk_id) DO UPDATE SET start_time = EXCLUDED.start_time, @@ -1783,8 +1925,9 @@ impl PostgresDb { parent_chunk_id = EXCLUDED.parent_chunk_id, child_chunk_ids = EXCLUDED.child_chunk_ids, updated_at = CURRENT_TIMESTAMP - "# - ) + "#, + table + )) .bind(chunk.file_id) .bind(&chunk.uuid) .bind(&chunk.chunk_id) @@ -1795,7 +1938,7 @@ impl PostgresDb { .bind(chunk.fps) .bind(chunk.start_frame) .bind(chunk.end_frame) - .bind(&chunk.text_content) + .bind(&tokenized_text) .bind(&content_with_rule) .bind(&chunk.metadata) .bind(&chunk.vector_id) @@ -1810,9 +1953,11 @@ impl PostgresDb { } pub async fn get_chunks_by_uuid(&self, uuid: &str) -> Result> { - let rows = sqlx::query( - "SELECT COALESCE(file_id, 0) as file_id, uuid, chunk_id, chunk_index, chunk_type, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids FROM chunks WHERE uuid = $1 ORDER BY chunk_index" - ) + let table = schema::table_name("chunks"); + let rows = sqlx::query(&format!( + "SELECT COALESCE(file_id, 0) as file_id, uuid, chunk_id, chunk_index, chunk_type, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids FROM {} WHERE uuid = $1 ORDER BY chunk_index", + table + )) .bind(uuid) .fetch_all(&self.pool) .await?; @@ -1884,9 +2029,11 @@ impl PostgresDb { } pub async fn get_chunk_by_chunk_id(&self, chunk_id: &str) -> Result> { - let row = sqlx::query( - "SELECT COALESCE(file_id, 0) as file_id, uuid, chunk_id, chunk_index, chunk_type, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids FROM chunks WHERE chunk_id = $1" - ) + let table = schema::table_name("chunks"); + let row = sqlx::query(&format!( + "SELECT COALESCE(file_id, 0) as file_id, uuid, chunk_id, chunk_index, chunk_type, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids FROM {} WHERE chunk_id = $1", + table + )) .bind(chunk_id) .fetch_optional(&self.pool) .await?; @@ -1958,9 +2105,11 @@ impl PostgresDb { chunk_id: &str, uuid: &str, ) -> Result> { - let row = sqlx::query( - "SELECT COALESCE(file_id, 0) as file_id, uuid, chunk_id, chunk_index, chunk_type, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids FROM chunks WHERE chunk_id = $1 AND uuid = $2" - ) + let table = schema::table_name("chunks"); + let row = sqlx::query(&format!( + "SELECT COALESCE(file_id, 0) as file_id, uuid, chunk_id, chunk_index, chunk_type, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids FROM {} WHERE chunk_id = $1 AND uuid = $2", + table + )) .bind(chunk_id) .bind(uuid) .fetch_optional(&self.pool) @@ -2029,9 +2178,10 @@ impl PostgresDb { } pub async fn store_pre_chunk(&self, pre_chunk: &PreChunk) -> Result { - let row = sqlx::query( + let table = schema::table_name("pre_chunks"); + let row = sqlx::query(&format!( r#" - INSERT INTO pre_chunks (file_id, source_type, source_file, chunk_type, start_time, end_time, start_frame, end_frame, fps, raw_json, text_content, processed, chunk_id) + INSERT INTO {} (file_id, source_type, source_file, chunk_type, start_time, end_time, start_frame, end_frame, fps, raw_json, text_content, processed, chunk_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ON CONFLICT (file_id, source_type, start_frame, end_frame) DO UPDATE SET start_time = EXCLUDED.start_time, @@ -2042,8 +2192,9 @@ impl PostgresDb { processed = EXCLUDED.processed, chunk_id = EXCLUDED.chunk_id RETURNING id - "# - ) + "#, + table + )) .bind(pre_chunk.file_id) .bind(&pre_chunk.source_type) .bind(&pre_chunk.source_file) @@ -2065,17 +2216,19 @@ impl PostgresDb { } pub async fn store_frame(&self, frame: &Frame) -> Result<()> { - sqlx::query( + let table = schema::table_name("frames"); + sqlx::query(&format!( r#" - INSERT INTO frames (file_id, frame_number, timestamp, fps, yolo_objects, ocr_results, face_results, frame_path) + INSERT INTO {} (file_id, frame_number, timestamp, fps, yolo_objects, ocr_results, face_results, frame_path) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (file_id, frame_number) DO UPDATE SET yolo_objects = EXCLUDED.yolo_objects, ocr_results = EXCLUDED.ocr_results, face_results = EXCLUDED.face_results, frame_path = EXCLUDED.frame_path - "# - ) + "#, + table + )) .bind(frame.file_id) .bind(frame.frame_number) .bind(frame.timestamp) @@ -2096,6 +2249,7 @@ impl PostgresDb { start_time: f64, end_time: f64, ) -> Result> { + let table = schema::table_name("frames"); let rows = sqlx::query_as::<_, ( i32, i32, @@ -2107,12 +2261,13 @@ impl PostgresDb { Option, Option, String, - )>( + )>(&format!( "SELECT id, file_id, frame_number, timestamp, fps, yolo_objects, ocr_results, face_results, frame_path, created_at - FROM frames + FROM {} WHERE file_id = $1 AND timestamp >= $2 AND timestamp <= $3 - ORDER BY frame_number" - ) + ORDER BY frame_number", + table + )) .bind(file_id) .bind(start_time) .bind(end_time) @@ -2144,12 +2299,14 @@ impl PostgresDb { start_time: f64, end_time: f64, ) -> Result> { - let rows = sqlx::query( + let table = schema::table_name("chunks"); + let rows = sqlx::query(&format!( "SELECT file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids - FROM chunks + FROM {} WHERE file_id = $1 AND start_time >= $2 AND end_time <= $3 - ORDER BY start_time" - ) + ORDER BY start_time", + table + )) .bind(file_id) .bind(start_time) .bind(end_time) @@ -2227,9 +2384,11 @@ impl PostgresDb { return Ok(vec![]); } - let rows = sqlx::query( - "SELECT file_id, uuid, chunk_id, chunk_index, chunk_type, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids FROM chunks WHERE chunk_id = ANY($1) ORDER BY chunk_index", - ) + let table = schema::table_name("chunks"); + let rows = sqlx::query(&format!( + "SELECT file_id, uuid, chunk_id, chunk_index, chunk_type, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids FROM {} WHERE chunk_id = ANY($1) ORDER BY chunk_index", + table + )) .bind(chunk_ids) .fetch_all(&self.pool) .await?; @@ -2301,7 +2460,8 @@ impl PostgresDb { } pub async fn get_file_id_by_uuid(&self, uuid: &str) -> Result { - let row = sqlx::query("SELECT id FROM videos WHERE uuid = $1") + let table = schema::table_name("videos"); + let row = sqlx::query(&format!("SELECT id FROM {} WHERE uuid = $1", table)) .bind(uuid) .fetch_one(&self.pool) .await?; @@ -2310,16 +2470,18 @@ impl PostgresDb { } pub async fn store_vector(&self, chunk_id: &str, vector: &[f32], uuid: &str) -> Result<()> { + let table = schema::table_name("chunk_vectors"); let vector_json = serde_json::json!(vector); - sqlx::query( + sqlx::query(&format!( r#" - INSERT INTO chunk_vectors (chunk_id, uuid, chunk_type, embedding) + INSERT INTO {} (chunk_id, uuid, chunk_type, embedding) VALUES ($1, $2, 'sentence', $3::jsonb) ON CONFLICT (chunk_id, uuid) DO UPDATE SET embedding = EXCLUDED.embedding "#, - ) + table + )) .bind(chunk_id) .bind(uuid) .bind(&vector_json) @@ -2331,11 +2493,15 @@ impl PostgresDb { } pub async fn update_vector_id(&self, chunk_id: &str, vector_id: &str) -> Result<()> { - sqlx::query("UPDATE chunks SET vector_id = $1 WHERE chunk_id = $2") - .bind(vector_id) - .bind(chunk_id) - .execute(&self.pool) - .await?; + let table = schema::table_name("chunks"); + sqlx::query(&format!( + "UPDATE {} SET vector_id = $1 WHERE chunk_id = $2", + table + )) + .bind(vector_id) + .bind(chunk_id) + .execute(&self.pool) + .await?; Ok(()) } @@ -2349,11 +2515,12 @@ impl PostgresDb { } pub async fn search_text(&self, query: &str, chunk_type: Option<&str>) -> Result> { + let table = schema::table_name("chunks"); let query_pattern = format!("%{}%", query); let sql = match chunk_type { - Some(_) => "SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, content, metadata, vector_id, parent_chunk_id, child_chunk_ids FROM chunks WHERE content->>'text' ILIKE $1 AND chunk_type = $2 ORDER BY chunk_index", - None => "SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, content, metadata, vector_id, parent_chunk_id, child_chunk_ids FROM chunks WHERE content->>'text' ILIKE $1 ORDER BY chunk_index", + Some(_) => &format!("SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, content, metadata, vector_id, parent_chunk_id, child_chunk_ids FROM {} WHERE content->>'text' ILIKE $1 AND chunk_type = $2 ORDER BY chunk_index", table), + None => &format!("SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, content, metadata, vector_id, parent_chunk_id, child_chunk_ids FROM {} WHERE content->>'text' ILIKE $1 ORDER BY chunk_index", table), }; let chunks = if let Some(ct) = chunk_type { @@ -2454,29 +2621,32 @@ impl PostgresDb { uuid: Option<&str>, limit: usize, ) -> Result> { + let table = schema::table_name("chunks"); let tsquery = self.prepare_tsquery(query)?; let sql = match uuid { - Some(_) => { + Some(_) => &format!( r#" SELECT chunk_id, uuid, chunk_index, chunk_type, start_time, end_time, - text_content, ts_rank_cd(search_vector, $1) as bm25_score - FROM chunks - WHERE search_vector @@ $1 AND uuid = $2 + text_content, ts_rank_cd(search_vector, $1::tsquery) as bm25_score + FROM {} + WHERE search_vector @@ $1::tsquery AND uuid = $2 ORDER BY bm25_score DESC LIMIT $3 - "# - } - None => { + "#, + table + ), + None => &format!( r#" SELECT chunk_id, uuid, chunk_index, chunk_type, start_time, end_time, - text_content, ts_rank_cd(search_vector, $1) as bm25_score - FROM chunks - WHERE search_vector @@ $1 + text_content, ts_rank_cd(search_vector, $1::tsquery) as bm25_score + FROM {} + WHERE search_vector @@ $1::tsquery ORDER BY bm25_score DESC LIMIT $2 - "# - } + "#, + table + ), }; let rows = if let Some(uuid) = uuid { @@ -2520,7 +2690,16 @@ impl PostgresDb { vector_weight: f32, bm25_weight: f32, ) -> Result> { + tracing::info!( + "hybrid_search called: query={}, uuid={:?}, limit={}, vector_weight={}, bm25_weight={}", + query, + uuid, + limit, + vector_weight, + bm25_weight + ); let bm25_results = self.search_bm25(query, uuid, limit * 2).await?; + tracing::info!("bm25_results count: {}", bm25_results.len()); let qdrant = QdrantDb::init().await?; let vector_results = if let Some(uuid) = uuid { @@ -2528,8 +2707,9 @@ impl PostgresDb { } else { qdrant.search(query_vector, limit * 2).await? }; + tracing::info!("vector_results count: {}", vector_results.len()); - let mut combined: std::collections::HashMap = + let mut combined: std::collections::HashMap<(String, String), HybridSearchResult> = std::collections::HashMap::new(); let max_bm25 = bm25_results @@ -2541,7 +2721,7 @@ impl PostgresDb { let normalized_score = r.bm25_score / max_bm25; let combined_score = (normalized_score * bm25_weight) as f64; combined.insert( - r.chunk_id.clone(), + (r.chunk_id.clone(), r.uuid.clone()), HybridSearchResult { chunk_id: r.chunk_id.clone(), uuid: r.uuid.clone(), @@ -2563,26 +2743,34 @@ impl PostgresDb { .unwrap_or(1.0) .max(0.001); - let chunk_ids: Vec = vector_results.iter().map(|r| r.chunk_id.clone()).collect(); - let vector_chunks = self.get_chunks_by_ids(&chunk_ids).await?; - let chunk_map: std::collections::HashMap = vector_chunks - .iter() - .map(|c| (c.chunk_id.clone(), c)) - .collect(); + // Build map from (chunk_id, uuid) to Chunk to handle duplicate chunk_ids across videos + let mut chunk_map: std::collections::HashMap<(String, String), Chunk> = + std::collections::HashMap::new(); + for search_result in &vector_results { + if let Ok(Some(chunk)) = self + .get_chunk_by_chunk_id_and_uuid(&search_result.chunk_id, &search_result.uuid) + .await + { + chunk_map.insert( + (search_result.chunk_id.clone(), search_result.uuid.clone()), + chunk, + ); + } + } for r in &vector_results { let normalized_score = r.score / max_vector; let combined_score = (normalized_score * vector_weight) as f64; - if let Some(existing) = combined.get_mut(&r.chunk_id) { + if let Some(existing) = combined.get_mut(&(r.chunk_id.clone(), r.uuid.clone())) { existing.vector_score = normalized_score as f64; existing.combined_score += combined_score; } else { - let chunk_data = chunk_map.get(&r.chunk_id); + let chunk_data = chunk_map.get(&(r.chunk_id.clone(), r.uuid.clone())); combined.insert( - r.chunk_id.clone(), + (r.chunk_id.clone(), r.uuid.clone()), HybridSearchResult { chunk_id: r.chunk_id.clone(), - uuid: chunk_data.map(|c| c.uuid.clone()).unwrap_or_default(), + uuid: r.uuid.clone(), chunk_index: chunk_data.map(|c| c.chunk_index).unwrap_or(0), chunk_type: chunk_data .map(|c| c.chunk_type.as_str().to_string()) @@ -2612,27 +2800,157 @@ impl PostgresDb { } fn prepare_tsquery_internal(&self, query: &str) -> Result { - let words: Vec = query - .split_whitespace() - .map(|w| { - let cleaned = w - .chars() - .filter(|c| c.is_alphanumeric()) - .collect::(); - if cleaned.is_empty() { - String::new() - } else { - format!("{}:*", cleaned.to_lowercase()) - } - }) - .filter(|w| !w.is_empty()) - .collect(); + let expander = global_synonym_expander(); - if words.is_empty() { - anyhow::bail!("Query contains no searchable terms"); + // 對中文查詢進行特殊處理 + let processed_query = if contains_chinese(query) { + // 先將簡體中文轉換為繁體中文(假設資料庫儲存繁體中文) + let normalized = normalize_chinese_query(query); + + // 使用智能同義詞擴展,然後對剩餘部分進行分詞 + let expanded = expander.expand_chinese_query(&normalized); + + // 如果擴展查詢包含 '&',表示已經進行了同義詞擴展 + if expanded.contains('&') { + expanded + } else { + // 沒有找到同義詞,進行常規分詞 + tokenize_chinese_text(&expanded) + } + } else { + query.to_string() + }; + + // 解析查詢字符串,處理同義詞組 + let groups = Self::parse_query_groups(&processed_query); + + let mut tsquery_groups = Vec::new(); + + for group in groups { + if group.is_empty() { + continue; + } + + // 檢查是否為同義詞組(格式: (詞語1 | 詞語2 | ...)) + let terms: Vec<&str> = if group.starts_with('(') && group.ends_with(')') { + // 提取括號內的詞語 + let inner = &group[1..group.len() - 1]; + inner.split('|').map(|s| s.trim()).collect() + } else { + // 單個詞語 + vec![group.as_str()] + }; + + // 為每個詞語生成 tsquery 片段 + let mut term_tsqueries = Vec::new(); + + for term in terms { + // 將詞語按空白字符分割(處理像 "電 腦" 這樣的已分詞詞語) + let parts: Vec<&str> = term.split_whitespace().collect(); + + // 清理每個部分並加上前綴搜索符號 + let cleaned_parts: Vec = parts + .iter() + .map(|part| { + // 保留字母数字字符和Unicode字母字符(包括中文) + let cleaned = part + .chars() + .filter(|c| c.is_alphanumeric() || c.is_alphabetic()) + .collect::(); + if cleaned.is_empty() { + None + } else { + Some(format!("{}:*", cleaned.to_lowercase())) + } + }) + .flatten() + .collect(); + + if cleaned_parts.is_empty() { + continue; // 跳過無效部分 + } + + // 如果只有一個部分,直接使用;多個部分用 AND 連接 + let term_tsquery = if cleaned_parts.len() == 1 { + cleaned_parts[0].clone() + } else { + cleaned_parts.join(" & ") + }; + + term_tsqueries.push(term_tsquery); + } + + if term_tsqueries.is_empty() { + continue; // 跳過無效詞語組 + } + + // 如果只有一個詞語 tsquery,不需括號;多個詞語用括號和 OR 連接 + let tsquery_group = if term_tsqueries.len() == 1 { + term_tsqueries[0].clone() + } else { + format!("({})", term_tsqueries.join(" | ")) + }; + tsquery_groups.push(tsquery_group); } - Ok(words.join(" & ")) + // 如果没有可搜索的术语,返回一个不会匹配任何内容的安全查询 + // 而不是报错,这样BM25搜索将返回空结果,但不会导致500错误 + if tsquery_groups.is_empty() { + return Ok("__no_match__:*".to_string()); + } + + Ok(tsquery_groups.join(" & ")) + } + + /// 解析查詢字符串,識別同義詞組(用括號包圍的部分) + fn parse_query_groups(query: &str) -> Vec { + let mut groups = Vec::new(); + let mut current_group = String::new(); + let mut paren_depth = 0; + + for ch in query.chars() { + match ch { + '(' => { + if paren_depth > 0 { + current_group.push(ch); + } + paren_depth += 1; + current_group.push(ch); + } + ')' => { + paren_depth -= 1; + current_group.push(ch); + if paren_depth == 0 { + groups.push(current_group.trim().to_string()); + current_group.clear(); + } + } + '&' if paren_depth == 0 => { + // 在同義詞組外遇到 &,分隔符 + if !current_group.trim().is_empty() { + groups.push(current_group.trim().to_string()); + current_group.clear(); + } + } + _ if paren_depth == 0 && ch.is_whitespace() => { + // 在同義詞組外遇到空白,分隔符 + if !current_group.trim().is_empty() { + groups.push(current_group.trim().to_string()); + current_group.clear(); + } + } + _ => { + current_group.push(ch); + } + } + } + + // 處理最後一個組 + if !current_group.trim().is_empty() { + groups.push(current_group.trim().to_string()); + } + + groups } } @@ -2668,18 +2986,21 @@ impl PostgresDb { } pub async fn get_pending_jobs(&self, limit: i32) -> Result> { - let rows = sqlx::query( + let monitor_jobs = schema::table_name("monitor_jobs"); + let processor_results = schema::table_name("processor_results"); + let rows = sqlx::query(&format!( r#" SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at, updated_at, created_at - FROM monitor_jobs + FROM {} WHERE status = 'pending' - OR (status = 'running' AND EXISTS (SELECT 1 FROM processor_results WHERE job_id = monitor_jobs.id AND status = 'pending')) + OR (status = 'running' AND EXISTS (SELECT 1 FROM {} WHERE job_id = monitor_jobs.id AND status = 'pending')) ORDER BY created_at ASC LIMIT $1 FOR UPDATE SKIP LOCKED - "# - ) + "#, + monitor_jobs, processor_results + )) .bind(limit) .fetch_all(&self.pool) .await?; @@ -2714,22 +3035,25 @@ impl PostgresDb { &self, limit: i32, ) -> Result> { - let rows = sqlx::query( + let monitor_jobs = schema::table_name("monitor_jobs"); + let processor_results = schema::table_name("processor_results"); + let rows = sqlx::query(&format!( r#" SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at, updated_at, created_at - FROM monitor_jobs + FROM {} WHERE status = 'running' AND NOT EXISTS ( - SELECT 1 FROM processor_results pr + SELECT 1 FROM {} pr WHERE pr.job_id = monitor_jobs.id AND pr.status IN ('pending', 'running') ) ORDER BY updated_at ASC LIMIT $1 FOR UPDATE SKIP LOCKED - "# - ) + "#, + monitor_jobs, processor_results + )) .bind(limit) .fetch_all(&self.pool) .await?; @@ -2766,13 +3090,15 @@ impl PostgresDb { completed_processors: Vec, failed_processors: Vec, ) -> Result<()> { - sqlx::query( - "UPDATE monitor_jobs + let table = schema::table_name("monitor_jobs"); + sqlx::query(&format!( + "UPDATE {} SET completed_processors = $1, failed_processors = $2, updated_at = CURRENT_TIMESTAMP WHERE id = $3", - ) + table + )) .bind(completed_processors) .bind(failed_processors) .bind(job_id) @@ -2782,9 +3108,11 @@ impl PostgresDb { } pub async fn update_job_status(&self, job_id: i32, status: MonitorJobStatus) -> Result<()> { - sqlx::query( - "UPDATE monitor_jobs SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE id = $2", - ) + let table = schema::table_name("monitor_jobs"); + sqlx::query(&format!( + "UPDATE {} SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE id = $2", + table + )) .bind(status.as_str()) .bind(job_id) .execute(&self.pool) @@ -2798,13 +3126,15 @@ impl PostgresDb { current_processor: Option<&str>, progress_current: i32, ) -> Result<()> { - sqlx::query( + let table = schema::table_name("monitor_jobs"); + sqlx::query(&format!( r#" - UPDATE monitor_jobs + UPDATE {} SET current_processor = $1, progress_current = $2, updated_at = CURRENT_TIMESTAMP WHERE id = $3 "#, - ) + table + )) .bind(current_processor) .bind(progress_current) .bind(job_id) @@ -2818,14 +3148,16 @@ impl PostgresDb { job_id: i32, processor_type: ProcessorType, ) -> Result { - let row = sqlx::query( + let table = schema::table_name("processor_results"); + let row = sqlx::query(&format!( r#" - INSERT INTO processor_results (job_id, processor, status) + INSERT INTO {} (job_id, processor, status) VALUES ($1, $2, 'pending') ON CONFLICT (job_id, processor) DO UPDATE SET job_id = EXCLUDED.job_id RETURNING id "#, - ) + table + )) .bind(job_id) .bind(processor_type.as_str()) .fetch_one(&self.pool) @@ -2842,9 +3174,10 @@ impl PostgresDb { error_message: Option<&str>, output_data: Option<&serde_json::Value>, ) -> Result<()> { - sqlx::query( + let table = schema::table_name("processor_results"); + sqlx::query(&format!( r#" - UPDATE processor_results + UPDATE {} SET status = $1, error_message = $2, output_data = $3, @@ -2853,7 +3186,8 @@ impl PostgresDb { updated_at = CURRENT_TIMESTAMP WHERE id = $4 "#, - ) + table + )) .bind(status.as_str()) .bind(error_message) .bind(output_data) @@ -2864,16 +3198,18 @@ impl PostgresDb { } pub async fn get_processor_results_by_job(&self, job_id: i32) -> Result> { - let rows = sqlx::query( + let table = schema::table_name("processor_results"); + let rows = sqlx::query(&format!( r#" SELECT id, job_id, processor, status, output_path, started_at, completed_at, error_message, progress_total, progress_current, last_checkpoint, created_at, updated_at, duration_secs - FROM processor_results + FROM {} WHERE job_id = $1 ORDER BY created_at ASC "#, - ) + table + )) .bind(job_id) .fetch_all(&self.pool) .await?; @@ -2910,8 +3246,9 @@ impl PostgresDb { } pub async fn get_video_status(&self, uuid: &str) -> Result> { + let table = schema::table_name("videos"); let result: Option = - sqlx::query_scalar("SELECT status FROM videos WHERE uuid = $1") + sqlx::query_scalar(&format!("SELECT status FROM {} WHERE uuid = $1", table)) .bind(uuid) .fetch_optional(&self.pool) .await?; @@ -2920,9 +3257,11 @@ impl PostgresDb { } pub async fn update_video_status(&self, uuid: &str, status: VideoStatus) -> Result<()> { - sqlx::query( - "UPDATE videos SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE uuid = $2", - ) + let table = schema::table_name("videos"); + sqlx::query(&format!( + "UPDATE {} SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE uuid = $2", + table + )) .bind(status.as_str()) .bind(uuid) .execute(&self.pool) @@ -2931,10 +3270,13 @@ impl PostgresDb { } pub async fn get_running_job_count(&self) -> Result { - let count: i64 = - sqlx::query_scalar("SELECT COUNT(*) FROM monitor_jobs WHERE status = 'running'") - .fetch_one(&self.pool) - .await?; + let table = schema::table_name("monitor_jobs"); + let count: i64 = sqlx::query_scalar(&format!( + "SELECT COUNT(*) FROM {} WHERE status = 'running'", + table + )) + .fetch_one(&self.pool) + .await?; Ok(count) } } @@ -3154,9 +3496,10 @@ mod tests { let words: Vec = query .split_whitespace() .map(|w| { + // 保留字母数字字符和Unicode字母字符(包括中文) let cleaned = w .chars() - .filter(|c| c.is_alphanumeric()) + .filter(|c| c.is_alphanumeric() || c.is_alphabetic()) .collect::(); if cleaned.is_empty() { String::new() @@ -3167,8 +3510,10 @@ mod tests { .filter(|w| !w.is_empty()) .collect(); + // 如果没有可搜索的术语,返回一个不会匹配任何内容的安全查询 + // 而不是报错,这样BM25搜索将返回空结果,但不会导致500错误 if words.is_empty() { - anyhow::bail!("Query contains no searchable terms"); + return Ok("__no_match__:*".to_string()); } Ok(words.join(" & ")) @@ -3190,10 +3535,21 @@ mod tests { assert_eq!(tsquery, "search:*"); } + #[test] + fn test_prepare_tsquery_chinese() { + let query = "電腦 測試"; + let tsquery = prepare_tsquery_test_helper(query).unwrap(); + assert!(tsquery.contains("電腦:*")); + assert!(tsquery.contains("測試:*")); + assert!(tsquery.contains(" & ")); + } + #[test] fn test_prepare_tsquery_empty_result() { let query = " !!! "; let result = prepare_tsquery_test_helper(query); - assert!(result.is_err()); + // 现在应该返回安全查询而不是错误 + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "__no_match__:*"); } } diff --git a/src/core/db/schema.rs b/src/core/db/schema.rs new file mode 100644 index 0000000..8da8477 --- /dev/null +++ b/src/core/db/schema.rs @@ -0,0 +1,30 @@ +use crate::core::config::DATABASE_SCHEMA; +use once_cell::sync::Lazy; + +pub static SCHEMA_PREFIX: Lazy = Lazy::new(|| { + let schema = DATABASE_SCHEMA.as_str(); + if schema == "public" { + String::new() + } else { + format!("{}.", schema) + } +}); + +pub fn table_name(table: &str) -> String { + let prefix = SCHEMA_PREFIX.as_str(); + if prefix.is_empty() { + table.to_string() + } else { + format!("{}{}", prefix, table) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_table_name_public() { + assert_eq!(table_name("videos"), "videos"); + } +}