diff --git a/Cargo.lock b/Cargo.lock index 38ce3b0..3abeadd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -32,20 +32,6 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" -[[package]] -name = "anstream" -version = "0.6.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" -dependencies = [ - "anstyle", - "anstyle-parse", - "anstyle-query", - "anstyle-wincon", - "colorchoice", - "utf8parse", -] - [[package]] name = "anstyle" version = "1.0.6" @@ -53,38 +39,76 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" [[package]] -name = "anstyle-parse" -version = "0.2.3" +name = "async-trait" +version = "0.1.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ - "utf8parse", + "proc-macro2", + "quote", + "syn", ] [[package]] -name = "anstyle-query" -version = "1.0.2" +name = "autocfg" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" -dependencies = [ - "windows-sys 0.52.0", -] +checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80" [[package]] -name = "anstyle-wincon" -version = "3.0.2" +name = "axum" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" dependencies = [ - "anstyle", - "windows-sys 0.52.0", + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", ] [[package]] -name = "autocfg" -version = "1.2.0" +name = "axum-core" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] [[package]] name = "backtrace" @@ -101,12 +125,6 @@ dependencies = [ "rustc-demangle", ] -[[package]] -name = "base64" -version = "0.21.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" - [[package]] name = "bitflags" version = "1.3.2" @@ -114,13 +132,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] -name = "block-buffer" -version = "0.10.4" +name = "bitflags" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" -dependencies = [ - "generic-array", -] +checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" [[package]] name = "bumpalo" @@ -128,12 +143,6 @@ version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" -[[package]] -name = "byteorder" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" - [[package]] name = "bytes" version = "1.6.0" @@ -210,21 +219,6 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" -[[package]] -name = "colorchoice" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" - -[[package]] -name = "cpufeatures" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" -dependencies = [ - "libc", -] - [[package]] name = "criterion" version = "0.5.1" @@ -292,76 +286,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" -[[package]] -name = "crypto-common" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" -dependencies = [ - "generic-array", - "typenum", -] - -[[package]] -name = "data-encoding" -version = "2.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" - -[[package]] -name = "digest" -version = "0.10.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" -dependencies = [ - "block-buffer", - "crypto-common", -] - [[package]] name = "either" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" -[[package]] -name = "encoding_rs" -version = "0.8.34" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" -dependencies = [ - "cfg-if", -] - -[[package]] -name = "env_filter" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea" -dependencies = [ - "log", - "regex", -] - -[[package]] -name = "env_logger" -version = "0.11.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9" -dependencies = [ - "anstream", - "anstyle", - "env_filter", - "humantime", - "log", -] - -[[package]] -name = "equivalent" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" - [[package]] name = "fnv" version = "1.0.7" @@ -394,7 +324,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", - "futures-sink", ] [[package]] @@ -403,12 +332,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" -[[package]] -name = "futures-sink" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" - [[package]] name = "futures-task" version = "0.3.30" @@ -422,21 +345,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-core", - "futures-sink", "futures-task", "pin-project-lite", "pin-utils", - "slab", -] - -[[package]] -name = "generic-array" -version = "0.14.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" -dependencies = [ - "typenum", - "version_check", ] [[package]] @@ -456,25 +367,6 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" -[[package]] -name = "h2" -version = "0.3.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http 0.2.12", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "half" version = "2.4.1" @@ -485,51 +377,24 @@ dependencies = [ "crunchy", ] -[[package]] -name = "hashbrown" -version = "0.14.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" - [[package]] name = "haystackdb" version = "0.1.0" dependencies = [ + "axum", "criterion", - "env_logger", "fs2", - "log", "memmap", + "parking_lot", "rayon", "serde", "serde_json", "tokio", + "tower", + "tower-http", + "tracing", + "tracing-subscriber", "uuid", - "warp", -] - -[[package]] -name = "headers" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" -dependencies = [ - "base64", - "bytes", - "headers-core", - "http 0.2.12", - "httpdate", - "mime", - "sha1", -] - -[[package]] -name = "headers-core" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" -dependencies = [ - "http 0.2.12", ] [[package]] @@ -540,9 +405,9 @@ checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "http" -version = "0.2.12" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", @@ -550,32 +415,33 @@ dependencies = [ ] [[package]] -name = "http" -version = "1.1.0" +name = "http-body" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "fnv", - "itoa", + "http", ] [[package]] -name = "http-body" -version = "0.4.6" +name = "http-body-util" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", - "http 0.2.12", + "futures-core", + "http", + "http-body", "pin-project-lite", ] [[package]] name = "httparse" -version = "1.8.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" [[package]] name = "httpdate" @@ -583,54 +449,38 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - [[package]] name = "hyper" -version = "0.14.28" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" +checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" dependencies = [ "bytes", "futures-channel", - "futures-core", "futures-util", - "h2", - "http 0.2.12", + "http", "http-body", "httparse", "httpdate", "itoa", "pin-project-lite", - "socket2", + "smallvec", "tokio", - "tower-service", - "tracing", - "want", -] - -[[package]] -name = "idna" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" -dependencies = [ - "unicode-bidi", - "unicode-normalization", ] [[package]] -name = "indexmap" -version = "2.2.6" +name = "hyper-util" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" dependencies = [ - "equivalent", - "hashbrown", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "pin-project-lite", + "tokio", ] [[package]] @@ -668,6 +518,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.153" @@ -690,6 +546,21 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.7.2" @@ -712,16 +583,6 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" -[[package]] -name = "mime_guess" -version = "2.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" -dependencies = [ - "mime", - "unicase", -] - [[package]] name = "miniz_oxide" version = "0.7.2" @@ -743,21 +604,12 @@ dependencies = [ ] [[package]] -name = "multer" -version = "2.1.0" +name = "nu-ansi-term" +version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "bytes", - "encoding_rs", - "futures-util", - "http 0.2.12", - "httparse", - "log", - "memchr", - "mime", - "spin", - "version_check", + "windows-sys 0.61.2", ] [[package]] @@ -889,17 +741,11 @@ dependencies = [ "plotters-backend", ] -[[package]] -name = "ppv-lite86" -version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" - [[package]] name = "proc-macro2" -version = "1.0.81" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba" +checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" dependencies = [ "unicode-ident", ] @@ -913,36 +759,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rand" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" -dependencies = [ - "libc", - "rand_chacha", - "rand_core", -] - -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" -dependencies = [ - "getrandom", -] - [[package]] name = "rayon" version = "1.10.0" @@ -969,7 +785,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -1007,6 +823,12 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + [[package]] name = "ryu" version = "1.0.17" @@ -1022,12 +844,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "scoped-tls" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" - [[package]] name = "scopeguard" version = "1.2.0" @@ -1036,18 +852,28 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "serde" -version = "1.0.198" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9846a40c979031340571da2545a4e5b7c4163bdae79b301d5f86d03979451fcc" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.198" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -1065,6 +891,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -1078,14 +915,12 @@ dependencies = [ ] [[package]] -name = "sha1" -version = "0.10.6" +name = "sharded-slab" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" dependencies = [ - "cfg-if", - "cpufeatures", - "digest", + "lazy_static", ] [[package]] @@ -1097,15 +932,6 @@ dependencies = [ "libc", ] -[[package]] -name = "slab" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" -dependencies = [ - "autocfg", -] - [[package]] name = "smallvec" version = "1.13.2" @@ -1122,17 +948,11 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "spin" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" - [[package]] name = "syn" -version = "2.0.60" +version = "2.0.110" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3" +checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea" dependencies = [ "proc-macro2", "quote", @@ -1140,23 +960,18 @@ dependencies = [ ] [[package]] -name = "thiserror" -version = "1.0.59" +name = "sync_wrapper" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0126ad08bff79f29fc3ae6a55cc72352056dfff61e3ff8bb7129476d44b23aa" -dependencies = [ - "thiserror-impl", -] +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" [[package]] -name = "thiserror-impl" -version = "1.0.59" +name = "thread_local" +version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1cd413b5d558b4c5bf3680e324a6fa5014e7b7c067a51e69dbdf47eb7148b66" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" dependencies = [ - "proc-macro2", - "quote", - "syn", + "cfg-if", ] [[package]] @@ -1169,21 +984,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "tinyvec" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" -dependencies = [ - "tinyvec_macros", -] - -[[package]] -name = "tinyvec_macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" - [[package]] name = "tokio" version = "1.37.0" @@ -1215,31 +1015,43 @@ dependencies = [ ] [[package]] -name = "tokio-tungstenite" -version = "0.21.0" +name = "tower" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ + "futures-core", "futures-util", - "log", + "pin-project", + "pin-project-lite", "tokio", - "tungstenite", + "tower-layer", + "tower-service", + "tracing", ] [[package]] -name = "tokio-util" -version = "0.7.10" +name = "tower-http" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ + "bitflags 2.10.0", "bytes", - "futures-core", - "futures-sink", + "http", + "http-body", + "http-body-util", "pin-project-lite", - "tokio", - "tracing", + "tower-layer", + "tower-service", ] +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.2" @@ -1248,108 +1060,72 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] [[package]] -name = "tracing-core" -version = "0.1.32" +name = "tracing-attributes" +version = "0.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" dependencies = [ - "once_cell", + "proc-macro2", + "quote", + "syn", ] [[package]] -name = "try-lock" -version = "0.2.5" +name = "tracing-core" +version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" +dependencies = [ + "once_cell", + "valuable", +] [[package]] -name = "tungstenite" -version = "0.21.0" +name = "tracing-log" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" dependencies = [ - "byteorder", - "bytes", - "data-encoding", - "http 1.1.0", - "httparse", "log", - "rand", - "sha1", - "thiserror", - "url", - "utf-8", + "once_cell", + "tracing-core", ] [[package]] -name = "typenum" -version = "1.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" - -[[package]] -name = "unicase" -version = "2.7.0" +name = "tracing-subscriber" +version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" dependencies = [ - "version_check", + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] -[[package]] -name = "unicode-bidi" -version = "0.3.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" - [[package]] name = "unicode-ident" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" -[[package]] -name = "unicode-normalization" -version = "0.1.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" -dependencies = [ - "tinyvec", -] - -[[package]] -name = "url" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" -dependencies = [ - "form_urlencoded", - "idna", - "percent-encoding", -] - -[[package]] -name = "utf-8" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" - -[[package]] -name = "utf8parse" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" - [[package]] name = "uuid" version = "1.8.0" @@ -1361,10 +1137,10 @@ dependencies = [ ] [[package]] -name = "version_check" -version = "0.9.4" +name = "valuable" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" [[package]] name = "walkdir" @@ -1376,44 +1152,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "want" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" -dependencies = [ - "try-lock", -] - -[[package]] -name = "warp" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c" -dependencies = [ - "bytes", - "futures-channel", - "futures-util", - "headers", - "http 0.2.12", - "hyper", - "log", - "mime", - "mime_guess", - "multer", - "percent-encoding", - "pin-project", - "scoped-tls", - "serde", - "serde_json", - "serde_urlencoded", - "tokio", - "tokio-tungstenite", - "tokio-util", - "tower-service", - "tracing", -] - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -1515,6 +1253,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-sys" version = "0.48.0" @@ -1533,6 +1277,15 @@ dependencies = [ "windows-targets 0.52.5", ] +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-targets" version = "0.48.5" diff --git a/Cargo.toml b/Cargo.toml index 4ab60b7..5377b8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,21 +3,20 @@ name = "haystackdb" version = "0.1.0" edition = "2021" -[build] -target = "x86_64-unknown-linux-gnu" -flags = ["-C", "target-cpu=native"] - [dependencies] -warp = "0.3" +axum = "0.7" tokio = { version = "1", features = ["full"] } +tower = "0.4" +tower-http = { version = "0.5", features = ["cors"] } serde = { version = "1.0", features = ["derive"] } rayon = "1.10.0" uuid = { version = "1.8.0", features = ["v4", "serde"] } memmap = "0.7.0" -log = "0.4.14" +tracing = "0.1" fs2 = "0.4.0" -env_logger = "0.11.3" +tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } serde_json = "1.0.68" +parking_lot = "0.12" [profile.release] opt-level = 3 diff --git a/src/constants.rs b/src/constants.rs index cef93b1..b0a3f5d 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -1,2 +1,3 @@ pub const VECTOR_SIZE: usize = 1024; -pub const QUANTIZED_VECTOR_SIZE: usize = VECTOR_SIZE / 8; +pub const BITS_PER_ELEMENT: usize = 8; +pub const QUANTIZED_VECTOR_SIZE: usize = VECTOR_SIZE / BITS_PER_ELEMENT; diff --git a/src/main.rs b/src/main.rs index 8f44465..01f0029 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,143 +1,118 @@ -use env_logger::Builder; use haystackdb::constants::VECTOR_SIZE; use haystackdb::services::CommitService; use haystackdb::services::QueryService; use haystackdb::structures::filters::Filter as QueryFilter; use haystackdb::structures::metadata_index::KVPair; -use log::info; -use log::LevelFilter; -use std::io::Write; -use std::sync::{Arc, Mutex}; +use parking_lot::Mutex; +use std::sync::Arc; use std::{self, path::PathBuf}; use tokio::time::{interval, Duration}; +use tracing::info; +use axum::{ + extract::{Path, State}, + routing::{get, post}, + Json, Router, +}; use std::collections::HashMap; -use tokio::sync::OnceCell; -use warp::Filter; -static ACTIVE_NAMESPACES: OnceCell>>>> = - OnceCell::const_new(); +type AppState = Arc>>>; #[tokio::main] async fn main() { - let mut builder = Builder::new(); - builder - .format(|buf, record| writeln!(buf, "{}: {}", record.level(), record.args())) - .filter(None, LevelFilter::Info) + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) .init(); - let active_namespaces = ACTIVE_NAMESPACES - .get_or_init(|| async { Arc::new(Mutex::new(HashMap::new())) }) - .await; - - let search_route = warp::path!("query" / String) - .and(warp::post()) - .and(warp::body::json()) - .and(with_active_namespaces(active_namespaces.clone())) - .then( - |namespace_id: String, body: (Vec, QueryFilter, usize), active_namespaces| async move { - let base_path = PathBuf::from(format!("/workspace/data/{}/current", namespace_id.clone())); - ensure_namespace_initialized(&namespace_id, &active_namespaces, base_path.clone()) - .await; - - let mut query_service = QueryService::new(base_path, namespace_id.clone()).unwrap(); - let fvec = &body.0; - let metadata = &body.1; - let top_k = body.2; - - let mut vec: [f32; VECTOR_SIZE] = [0.0; VECTOR_SIZE]; - fvec.iter() - .enumerate() - .for_each(|(i, &val)| vec[i] = val as f32); + let active_namespaces = Arc::new(Mutex::new(HashMap::new())); - let start = std::time::Instant::now(); + let app = Router::new() + .route("/query/:namespace_id", post(query_handler)) + .route("/addVector/:namespace_id", post(add_vector_handler)) + .route("/pitr/:namespace_id/:timestamp", get(pitr_handler)) + .with_state(active_namespaces); - let search_result = query_service - .query(&vec, metadata, top_k) - .expect("Failed to query"); + let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await.unwrap(); - let duration = start.elapsed(); + info!("Server listening on 0.0.0.0:8080"); + axum::serve(listener, app).await.unwrap(); +} + +async fn query_handler( + Path(namespace_id): Path, + State(active_namespaces): State, + Json(body): Json<(Vec, QueryFilter, usize)>, +) -> Json { + let base_path = PathBuf::from(format!("/workspace/data/{}/current", namespace_id)); + ensure_namespace_initialized(&namespace_id, &active_namespaces, base_path.clone()).await; + + let mut query_service = QueryService::new(base_path, namespace_id.clone()).unwrap(); + let fvec = &body.0; + let metadata = &body.1; + let top_k = body.2; + + let mut vec: [f32; VECTOR_SIZE] = [0.0; VECTOR_SIZE]; + fvec.iter() + .enumerate() + .for_each(|(i, &val)| vec[i] = val as f32); + + let start = std::time::Instant::now(); + + let search_result = query_service + .query(&vec, metadata, top_k) + .expect("Failed to query"); + + let duration = start.elapsed(); + + info!(?duration, "Query completed"); + Json(serde_json::to_value(search_result).unwrap()) +} + +async fn add_vector_handler( + Path(namespace_id): Path, + State(active_namespaces): State, + Json(body): Json<(Vec, Vec, String)>, +) -> Json<&'static str> { + let base_path = PathBuf::from(format!("/workspace/data/{}/current", namespace_id)); + + ensure_namespace_initialized(&namespace_id, &active_namespaces, base_path.clone()).await; + + let mut commit_service = CommitService::new(base_path, namespace_id.clone()).unwrap(); + let fvec = &body.0; + let metadata = &body.1; + + let mut vec: [f32; VECTOR_SIZE] = [0.0; VECTOR_SIZE]; + fvec.iter() + .enumerate() + .for_each(|(i, &val)| vec[i] = val as f32); - println!("Query took {:?} to complete", duration); - warp::reply::json(&search_result) - }, - ); - - let add_vector_route = - warp::path!("addVector" / String) - .and(warp::post()) - .and(warp::body::json()) - .and(with_active_namespaces(active_namespaces.clone())) - .then( - |namespace_id: String, - body: (Vec, Vec, String), - active_namespaces| async move { - let base_path = PathBuf::from(format!( - "/workspace/data/{}/current", - namespace_id.clone() - )); - - ensure_namespace_initialized( - &namespace_id, - &active_namespaces, - base_path.clone(), - ) - .await; - - let mut commit_service = - CommitService::new(base_path, namespace_id.clone()).unwrap(); - let fvec = &body.0; - let metadata = &body.1; - - let mut vec: [f32; VECTOR_SIZE] = [0.0; VECTOR_SIZE]; - fvec.iter() - .enumerate() - .for_each(|(i, &val)| vec[i] = val as f32); - - // let id = uuid::Uuid::from_str(id_str).unwrap(); - commit_service.add_to_wal(vec![vec], vec![metadata.clone()]).expect("Failed to add to WAL"); - warp::reply::json(&"Success") - }, - ); - - // add a PITR route - let pitr_route = warp::path!("pitr" / String / String) - .and(warp::get()) - .and(with_active_namespaces(active_namespaces.clone())) - .then( - |namespace_id: String, timestamp: String, active_namespaces| async move { - println!("PITR for namespace: {}", namespace_id); - let base_path = - PathBuf::from(format!("/workspace/data/{}/current", namespace_id.clone())); - - ensure_namespace_initialized(&namespace_id, &active_namespaces, base_path.clone()) - .await; - - let mut commit_service = - CommitService::new(base_path, namespace_id.clone()).unwrap(); - - let timestamp = timestamp.parse::().unwrap(); - commit_service - .recover_point_in_time(timestamp) - .expect("Failed to PITR"); - warp::reply::json(&"Success") - }, - ); - - let routes = search_route - .or(add_vector_route) - .or(pitr_route) - .with(warp::cors().allow_any_origin()); - warp::serve(routes).run(([0, 0, 0, 0], 8080)).await; + commit_service + .add_to_wal(vec![vec], vec![metadata.clone()]) + .expect("Failed to add to WAL"); + + Json("Success") } -fn with_active_namespaces( - active_namespaces: Arc>>>, -) -> impl Filter< - Extract = (Arc>>>,), - Error = std::convert::Infallible, -> + Clone { - warp::any().map(move || active_namespaces.clone()) +async fn pitr_handler( + Path((namespace_id, timestamp)): Path<(String, String)>, + State(active_namespaces): State, +) -> Json<&'static str> { + info!(namespace_id = %namespace_id, "Executing PITR"); + let base_path = PathBuf::from(format!("/workspace/data/{}/current", namespace_id)); + + ensure_namespace_initialized(&namespace_id, &active_namespaces, base_path.clone()).await; + + let mut commit_service = CommitService::new(base_path, namespace_id.clone()).unwrap(); + + let timestamp = timestamp.parse::().unwrap(); + commit_service + .recover_point_in_time(timestamp) + .expect("Failed to PITR"); + + Json("Success") } async fn ensure_namespace_initialized( @@ -145,25 +120,21 @@ async fn ensure_namespace_initialized( active_namespaces: &Arc>>>, base_path_for_async: PathBuf, ) { - let mut namespaces = active_namespaces.lock().unwrap(); + let mut namespaces = active_namespaces.lock(); if !namespaces.contains_key(namespace_id) { let namespace_id_cloned = namespace_id.clone(); let handle = tokio::spawn(async move { let mut interval = interval(Duration::from_secs(10)); loop { interval.tick().await; - println!("Committing for namespace {}", namespace_id_cloned); + info!(namespace_id = %namespace_id_cloned, "Starting commit"); let start = std::time::Instant::now(); - let commit_worker = std::sync::Arc::new(std::sync::Mutex::new( + let commit_worker = Arc::new(Mutex::new( CommitService::new(base_path_for_async.clone(), namespace_id_cloned.clone()) .unwrap(), )); - commit_worker - .lock() - .unwrap() - .commit() - .expect("Failed to commit"); + commit_worker.lock().commit().expect("Failed to commit"); let duration = start.elapsed(); info!("Commit worker took {:?} to complete", duration); } diff --git a/src/main1.rs b/src/main1.rs deleted file mode 100644 index 8334b63..0000000 --- a/src/main1.rs +++ /dev/null @@ -1,146 +0,0 @@ -extern crate haystackdb; -use haystackdb::constants::VECTOR_SIZE; -use haystackdb::services::commit::CommitService; -use haystackdb::services::query::QueryService; -use haystackdb::structures::metadata_index::KVPair; -use std::fs; -use std::path::PathBuf; -use std::str::FromStr; -use uuid; - -fn random_vec() -> [f32; VECTOR_SIZE] { - return [0.0; VECTOR_SIZE]; -} - -fn main() { - let namespace_id = uuid::Uuid::new_v4().to_string(); - let path = PathBuf::from_str("tests/data") - .expect("Failed to create path") - .join("namespaces") - .join(namespace_id.clone()); - fs::create_dir_all(&path).expect("Failed to create directory"); - let mut commit_service = CommitService::new(path.clone(), namespace_id.clone()) - .expect("Failed to create commit service"); - - let start = std::time::Instant::now(); - // for _ in 0..20000 { - // commit_service - // .add_to_wal( - // vec![random_vec()], - // vec![vec![KVPair { - // key: "key".to_string(), - // value: "value".to_string(), - // }]], - // ) - // .expect("Failed to add to WAL"); - // } - - const NUM_VECTORS: usize = 100_000; - - let batch_vectors: Vec> = - (0..NUM_VECTORS).map(|_| vec![random_vec()]).collect(); - let batch_kvs: Vec>> = (0..NUM_VECTORS) - .map(|_| { - vec![vec![KVPair { - key: "key".to_string(), - value: "value".to_string(), - }]] - }) - .collect(); - - println!("Batch creation took: {:?}", start.elapsed()); - commit_service - .batch_add_to_wal(batch_vectors, batch_kvs) - .expect("Failed to add to WAL"); - - println!("Add to WAL took: {:?}", start.elapsed()); - - // commit_service - // .add_to_wal( - // vec![[0.0; VECTOR_SIZE]], - // vec![vec![KVPair { - // key: "key".to_string(), - // value: "value".to_string(), - // }]], - // ) - // .expect("Failed to add to WAL"); - - let start = std::time::Instant::now(); - - commit_service.commit().expect("Failed to commit"); - - println!("Commit took: {:?}", start.elapsed()); - - let mut query_service = - QueryService::new(path.clone(), namespace_id).expect("Failed to create query service"); - - let _start = std::time::Instant::now(); - - const NUM_RUNS: usize = 100; - - let start = std::time::Instant::now(); - - for _ in 0..NUM_RUNS { - let _ = query_service - .query( - &[0.0; VECTOR_SIZE], - vec![KVPair { - key: "key".to_string(), - value: "value".to_string(), - }], - 1, - ) - .expect("Failed to query"); - - // println!("{:?}", result); - } - - println!("Query took: {:?}", start.elapsed().div_f32(NUM_RUNS as f32)); - - // let result = query_service - // .query( - // &[0.0; VECTOR_SIZE], - // vec![KVPair { - // key: "key".to_string(), - // value: "value".to_string(), - // }], - // 1, - // ) - // .expect("Failed to query"); - - // println!("{:?}", result); - - // println!("Query took: {:?}", start.elapsed()); -} - -// fn main() { -// let mut storage_manager: StorageManager = StorageManager::new( -// PathBuf::from_str("tests/data/test.db").expect("Failed to create path"), -// ) -// .expect("Failed to create storage manager"); - -// let mut node: Node = Node::new_leaf(0); - -// for i in 0..2048 { -// node.set_key_value(i, uuid::Uuid::new_v4().to_string()); -// } - -// let serialized = Node::serialize(&node); -// let deserialized = Node::deserialize(&serialized); - -// assert_eq!(node, deserialized); - -// let offset = storage_manager -// .store_node(&mut node) -// .expect("Failed to store node"); - -// node.offset = offset; - -// let mut loaded_node = storage_manager -// .load_node(offset) -// .expect("Failed to load node"); - -// loaded_node.offset = offset; - -// assert_eq!(loaded_node, node); -// } diff --git a/src/math/gemm.rs b/src/math/gemm.rs index e287f42..283e972 100644 --- a/src/math/gemm.rs +++ b/src/math/gemm.rs @@ -1,10 +1,6 @@ use crate::constants::VECTOR_SIZE; -pub fn gemm( - a: &Vec<[f32; VECTOR_SIZE]>, - b: &Vec<[f32; VECTOR_SIZE]>, - result: &mut Vec<[f32; VECTOR_SIZE]>, -) { +pub fn gemm(a: &[[f32; VECTOR_SIZE]], b: &[[f32; VECTOR_SIZE]], result: &mut [[f32; VECTOR_SIZE]]) { for i in 0..a.len() { for j in 0..VECTOR_SIZE { let mut sum = 0.0_f32; diff --git a/src/math/gemv.rs b/src/math/gemv.rs index e5d8a1f..888b882 100644 --- a/src/math/gemv.rs +++ b/src/math/gemv.rs @@ -1,6 +1,6 @@ use crate::constants::VECTOR_SIZE; -pub fn gemv(matrix: &Vec<[f32; VECTOR_SIZE]>, vector: &[f32; VECTOR_SIZE]) -> Vec { +pub fn gemv(matrix: &[[f32; VECTOR_SIZE]], vector: &[f32; VECTOR_SIZE]) -> Vec { let mut result = vec![0f32; VECTOR_SIZE]; for (i, row) in matrix.iter().enumerate() { let mut sum = 0f32; diff --git a/src/services/commit.rs b/src/services/commit.rs index 98c62e1..36682d9 100644 --- a/src/services/commit.rs +++ b/src/services/commit.rs @@ -8,6 +8,7 @@ use std::collections::HashMap; use std::io; use std::os::unix::fs as unix_fs; use std::path::PathBuf; +use tracing::{info, debug}; pub struct CommitService { pub state: NamespaceState, @@ -24,13 +25,11 @@ impl CommitService { let commits_len = commits.len(); - if commits.len() == 0 { + if commits.is_empty() { return Ok(()); } - println!("Commits: {:?}", commits_len); - - let mut processed = 0; + info!(commits_len, "Starting commit processing"); let merged_commits = commits .iter() @@ -44,10 +43,7 @@ impl CommitService { items }); - for (vectors, kvs) in vec![merged_commits] { - // let vectors = commit.vectors; - // let kvs = commit.kvs; - + for (processed, (vectors, kvs)) in [merged_commits].iter().enumerate() { if vectors.len() != kvs.len() { return Err(io::Error::new( io::ErrorKind::InvalidInput, @@ -55,33 +51,26 @@ impl CommitService { )); } - println!( - "Processing commit: {} of {} with vectors of len: {}", + debug!( processed, - commits_len, - vectors.len() + total = commits_len, + vectors_len = vectors.len(), + "Processing commit" ); - processed += 1; - // generate u128 ids let ids = (0..vectors.len()) .map(|_| uuid::Uuid::new_v4().as_u128()) .collect::>(); - println!("Generated ids"); - - let vector_indices = self.state.vectors.batch_push(vectors)?; + debug!(ids_len = ids.len(), "Generated IDs"); - println!("Vector indices: {:?}", vector_indices); + let vector_indices = self.state.vectors.batch_push(vectors.clone())?; - println!("Pushed vectors"); + debug!(indices_count = vector_indices.len(), "Pushed vectors"); let mut inverted_index_items: HashMap> = HashMap::new(); - - // let mut metadata_index_items = Vec::new(); - let mut batch_metadata_to_insert = Vec::new(); for (idx, kv) in kvs.iter().enumerate() { @@ -89,30 +78,14 @@ impl CommitService { id: ids[idx], kvs: kv.clone(), vector_index: vector_indices[idx], - // namespaced_id: self.state.namespace_id.clone(), }; - // println!("Inserting id: {}, {} of {}", ids[idx], idx, ids.len()); - batch_metadata_to_insert.push((ids[idx], metadata_index_item)); - // self.state - // .metadata_index - // .insert(ids[idx], metadata_index_item); - for kv in kv { - // let inverted_index_item = InvertedIndexItem { - // indices: vec![vector_indices[idx]], - // ids: vec![ids[idx]], - // }; - - // self.state - // .inverted_index - // .insert_append(kv.clone(), inverted_index_item); - inverted_index_items .entry(kv.clone()) - .or_insert_with(Vec::new) + .or_default() .push((vector_indices[idx], ids[idx])); } } @@ -121,8 +94,6 @@ impl CommitService { .metadata_index .batch_insert(batch_metadata_to_insert); - // self.state.metadata_index.batch_insert(metadata_index_items); - for (kv, items) in inverted_index_items { let inverted_index_item = InvertedIndexItem { indices: items.iter().map(|(idx, _)| *idx).collect(), @@ -143,14 +114,14 @@ impl CommitService { } pub fn recover_point_in_time(&mut self, timestamp: u64) -> io::Result<()> { - println!("Recovering to timestamp: {}", timestamp); + info!(timestamp, "Starting point-in-time recovery"); let versions: Vec = self.state.get_all_versions()?; let max_version = versions.iter().max().unwrap(); let new_version = max_version + 1; - println!("Versions: {:?}", versions); + debug!(?versions, "Available versions"); - println!("Creating new version: {}", new_version); + info!(new_version, "Creating new version"); let new_version_path = self .state @@ -165,15 +136,13 @@ impl CommitService { let commits = self.state.wal.get_commits_before(timestamp)?; let commits_len = commits.len(); - if commits.len() == 0 { + if commits.is_empty() { return Ok(()); } - println!("Commits to PITR: {:?}", commits_len); + info!(commits_len, "Processing commits for PITR"); - let mut processed = 0; - - for commit in commits.iter() { + for (processed, commit) in commits.iter().enumerate() { let vectors = commit.vectors.clone(); let kvs = commit.kvs.clone(); @@ -184,25 +153,23 @@ impl CommitService { )); } - println!( - "Processing commit: {} of {} with vectors of len: {}", + debug!( processed, - commits_len, - vectors.len() + total = commits_len, + vectors_len = vectors.len(), + "Processing PITR commit" ); - processed += 1; - // generate u128 ids let ids = (0..vectors.len()) .map(|_| uuid::Uuid::new_v4().as_u128()) .collect::>(); - println!("Generated ids"); + debug!(ids_len = ids.len(), "Generated IDs"); let vector_indices = fresh_state.vectors.batch_push(vectors)?; - println!("Pushed vectors"); + debug!(indices_count = vector_indices.len(), "Pushed vectors"); let mut inverted_index_items: HashMap> = HashMap::new(); @@ -213,17 +180,14 @@ impl CommitService { id: ids[idx], kvs: kv.clone(), vector_index: vector_indices[idx], - // namespaced_id: self.state.namespace_id.clone(), }; - // println!("Inserting id: {}, {} of {}", ids[idx], idx, ids.len()); - metadata_index_items.push((ids[idx], metadata_index_item)); for kv in kv { inverted_index_items .entry(kv.clone()) - .or_insert_with(Vec::new) + .or_default() .push((vector_indices[idx], ids[idx])); } } @@ -249,7 +213,7 @@ impl CommitService { // update symlink for /current let current_path = self.state.path.clone(); - println!("Removing current symlink: {:?}", current_path); + info!(?current_path, "Updating current symlink"); std::fs::remove_file(¤t_path)?; unix_fs::symlink(&new_version_path, ¤t_path)?; diff --git a/src/services/lock_service.rs b/src/services/lock_service.rs index 23e81e7..5b0c6b8 100644 --- a/src/services/lock_service.rs +++ b/src/services/lock_service.rs @@ -18,6 +18,7 @@ impl LockService { .read(true) .write(true) .create(true) + .truncate(true) .open(&path)?; file.lock_exclusive()?; Ok(()) diff --git a/src/services/namespace_state.rs b/src/services/namespace_state.rs index 22ca8a8..a0aedbd 100644 --- a/src/services/namespace_state.rs +++ b/src/services/namespace_state.rs @@ -21,17 +21,16 @@ pub struct NamespaceState { fn get_all_versions(path: &Path) -> io::Result> { let mut versions = Vec::new(); - for entry in fs::read_dir(&path)? { + for entry in fs::read_dir(path)? { let entry = entry?; let path = entry.path(); if path.is_dir() { let version = path.file_name().unwrap().to_str().unwrap().to_string(); // parse as int without `v` to see if it's a version // must remove the v first - if version.starts_with("v") { - let version = version[1..].parse::(); - if version.is_ok() { - versions.push(version.unwrap()); + if let Some(stripped) = version.strip_prefix("v") { + if let Ok(version) = stripped.parse::() { + versions.push(version); } } } @@ -43,20 +42,16 @@ impl NamespaceState { pub fn new(path: PathBuf, namespace_id: String) -> io::Result { // path should be .../current, which should be a symlink to the current version - // println!("Creating namespace state with path: {:?}", path); - if !path.exists() { - fs::create_dir_all(&path.clone().parent().unwrap()) + fs::create_dir_all(path.clone().parent().unwrap()) .expect("Failed to create directory"); } let versions = get_all_versions(path.clone().parent().unwrap())?; - if versions.len() == 0 { + if versions.is_empty() { // create v0 - // println!("Creating v0"); let version_path = path.clone().parent().unwrap().join("v0"); - // println!("Creating version path: {:?}", version_path); fs::create_dir_all(&version_path).expect("Failed to create directory"); // create symlink @@ -94,24 +89,19 @@ impl NamespaceState { pub fn get_all_versions(&self) -> io::Result> { let mut versions = Vec::new(); - for entry in fs::read_dir(&self.path.parent().unwrap())? { + for entry in fs::read_dir(self.path.parent().unwrap())? { let entry = entry?; let path = entry.path(); if path.is_dir() { - // println!("path: {:?}", path); let version = path.file_name().unwrap().to_str().unwrap().to_string(); - // println!("version: {:?}", version); // parse as int without `v` to see if it's a version - // must remove the v first - if version.starts_with("v") { - let version = version[1..].parse::(); - if version.is_ok() { - versions.push(version.unwrap()); + if let Some(stripped) = version.strip_prefix("v") { + if let Ok(version) = stripped.parse::() { + versions.push(version); } } } } - // println!("versions: {:?}", versions); Ok(versions) } } diff --git a/src/services/query.rs b/src/services/query.rs index df944e1..d915a62 100644 --- a/src/services/query.rs +++ b/src/services/query.rs @@ -8,6 +8,7 @@ use crate::structures::metadata_index::KVPair; use crate::utils::quantize; use std::io; use std::path::PathBuf; +use tracing::warn; pub struct QueryService { pub state: NamespaceState, @@ -37,7 +38,7 @@ impl QueryService { let mut current_batch = Vec::new(); for index in indices { - if current_batch.len() == 0 { + if current_batch.is_empty() { current_batch.push(index); } else { let last_index = current_batch[current_batch.len() - 1]; @@ -54,12 +55,10 @@ impl QueryService { current_batch.sort(); current_batch.dedup(); - if current_batch.len() > 0 { + if !current_batch.is_empty() { batch_indices.push(current_batch); } - // println!("BATCH INDICES: {:?}", batch_indices.len()); - let mut top_k_indices = Vec::new(); let top_k_to_use = top_k.min(ids.len()); @@ -71,7 +70,7 @@ impl QueryService { .par_iter() .enumerate() .fold( - || Vec::new(), + Vec::new, |mut acc, (idx, vector)| { let distance = hamming_distance(&quantized_query_vector, vector); @@ -91,7 +90,7 @@ impl QueryService { }, ) .reduce( - || Vec::new(), // Initializer for the reduce step + Vec::new, // Initializer for the reduce step |mut a, mut b| { // How to combine results from different threads a.append(&mut b); @@ -112,7 +111,7 @@ impl QueryService { kvs.push(item.kvs); } None => { - println!("Metadata not found"); + warn!(id, "Metadata not found for ID"); continue; } } diff --git a/src/structures.rs b/src/structures.rs index 066f830..6254a01 100644 --- a/src/structures.rs +++ b/src/structures.rs @@ -3,5 +3,4 @@ pub mod filters; pub mod inverted_index; pub mod metadata_index; pub mod mmap_tree; -pub mod tree; pub mod wal; diff --git a/src/structures/dense_vector_list.rs b/src/structures/dense_vector_list.rs index 8243704..87a7472 100644 --- a/src/structures/dense_vector_list.rs +++ b/src/structures/dense_vector_list.rs @@ -3,6 +3,7 @@ use memmap::MmapMut; use std::fs::OpenOptions; use std::io; use std::path::PathBuf; +use tracing::{debug, error}; const SIZE_OF_U64: usize = std::mem::size_of::(); const HEADER_SIZE: usize = SIZE_OF_U64; @@ -66,7 +67,7 @@ impl DenseVectorList { } fn resize_mmap(&mut self, new_len: usize) -> io::Result<()> { - println!("Resizing mmap in DenseVectorList"); + debug!(new_len, "Resizing mmap in DenseVectorList"); let file = OpenOptions::new() .read(true) @@ -87,25 +88,15 @@ impl DenseVectorList { let total_size = vectors.len() * QUANTIZED_VECTOR_SIZE; let required_space = start_offset + total_size; - // println!( - // "Required space: {}, mmap len: {}", - // required_space, - // self.mmap.len() - // ); - if required_space > self.mmap.len() { self.resize_mmap(required_space * 2)?; } - // println!("Batch push"); - for (i, vector) in vectors.iter().enumerate() { let offset = start_offset + i * QUANTIZED_VECTOR_SIZE; self.mmap[offset..offset + QUANTIZED_VECTOR_SIZE].copy_from_slice(vector); } - // println!("Batch push done"); - self.used_space += total_size; // Update the header in the mmap self.mmap[0..HEADER_SIZE].copy_from_slice(&(self.used_space as u64).to_le_bytes()); @@ -120,10 +111,7 @@ impl DenseVectorList { let end = offset + QUANTIZED_VECTOR_SIZE; if end > self.used_space + HEADER_SIZE { - // print everything for debugging - println!("Offset: {}", offset); - println!("End: {}", end); - println!("Used space: {}", self.used_space); + error!(offset, end, used_space = self.used_space, "Index out of bounds in get"); return Err(io::Error::new( io::ErrorKind::InvalidInput, @@ -146,23 +134,20 @@ impl DenseVectorList { let end = start + num_elements * QUANTIZED_VECTOR_SIZE; if end > self.used_space + HEADER_SIZE { - println!("start: {}", start); - println!("End: {}", end); - println!("Used space: {}", self.used_space); - println!("Num elements: {}", num_elements); - println!("Index: {}", index); + error!( + start, + end, + used_space = self.used_space, + num_elements, + index, + "Index out of bounds in get_contiguous" + ); return Err(io::Error::new( io::ErrorKind::InvalidInput, "Index out of bounds", )); } - // let mut vectors = Vec::with_capacity(num_elements); - // for i in 0..num_elements { - // let offset = HEADER_SIZE + (index + i) * QUANTIZED_VECTOR_SIZE; - // vectors.push(self.get(index + i)?); - // } - // the indices are contiguous, so we can just get a slice of the mmap let vectors: &[[u8; QUANTIZED_VECTOR_SIZE]] = unsafe { std::slice::from_raw_parts( @@ -178,6 +163,10 @@ impl DenseVectorList { self.used_space / QUANTIZED_VECTOR_SIZE } + pub fn is_empty(&self) -> bool { + self.used_space == 0 + } + pub fn insert(&mut self, index: usize, vector: [u8; QUANTIZED_VECTOR_SIZE]) -> io::Result<()> { let offset = HEADER_SIZE + index * QUANTIZED_VECTOR_SIZE; let end = offset + QUANTIZED_VECTOR_SIZE; diff --git a/src/structures/filters.rs b/src/structures/filters.rs index dda6c10..200a1d4 100644 --- a/src/structures/filters.rs +++ b/src/structures/filters.rs @@ -28,7 +28,7 @@ impl Filters { pub fn new(indices: Vec, current_ids: Vec) -> Self { Filters { current_indices: indices, - current_ids: current_ids, + current_ids, } } @@ -146,10 +146,6 @@ impl Filters { result } Filter::Eq(key, value) => { - println!( - "Evaluating EQ filter for key: {:?}, value: {:?}", - key, value - ); // Debug output let kv_pair = KVPair::new(key.clone(), value.clone()); // Ensure correct KVPair creation Filters::from_index(index, &kv_pair) } diff --git a/src/structures/inverted_index.rs b/src/structures/inverted_index.rs index 2671d8f..7860cb9 100644 --- a/src/structures/inverted_index.rs +++ b/src/structures/inverted_index.rs @@ -21,84 +21,57 @@ impl Display for InvertedIndexItem { } impl TreeSerialization for InvertedIndexItem { - fn serialize(&self) -> Vec { - let mut serialized = Vec::new(); - - serialized.extend_from_slice(self.indices.len().to_le_bytes().as_ref()); + fn write_to(&self, writer: &mut W) -> std::io::Result<()> { + (self.indices.len() as u64).write_to(writer)?; let len_of_index_bytes: usize = 8; - - serialized.extend_from_slice(len_of_index_bytes.to_le_bytes().as_ref()); + len_of_index_bytes.write_to(writer)?; for index in &self.indices { - serialized.extend_from_slice(index.to_le_bytes().as_ref()); + index.write_to(writer)?; } - serialized.extend_from_slice(self.ids.len().to_le_bytes().as_ref()); + (self.ids.len() as u64).write_to(writer)?; let len_of_id_bytes: usize = 16; - - serialized.extend_from_slice(len_of_id_bytes.to_le_bytes().as_ref()); + len_of_id_bytes.write_to(writer)?; for id in &self.ids { - serialized.extend_from_slice(id.to_le_bytes().as_ref()); + id.write_to(writer)?; } - serialized + Ok(()) + } + + fn serialized_size(&self) -> usize { + 8 + // indices.len() + 8 + // len_of_index_bytes + self.indices.len() * 8 + // indices + 8 + // ids.len() + 8 + // len_of_id_bytes + self.ids.len() * 16 // ids } } impl TreeDeserialization for InvertedIndexItem { - fn deserialize(data: &[u8]) -> Self { - let mut offset = 0; - - let indices_len = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize; - offset += 8; - // let mut indices = Vec::new(); - let len_of_index_bytes = usize::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); - offset += 8; - - let start = offset; - let end = start + indices_len * len_of_index_bytes; - - let indices_bytes = &data[start..end]; - - let indices_chunks = indices_bytes.chunks(len_of_index_bytes); - - // for chunk in indices_chunks { - // let index = usize::from_le_bytes(chunk.try_into().unwrap()); - // indices.push(index); - // } + fn read_from(reader: &mut R) -> std::io::Result { + let indices_len = u64::read_from(reader)? as usize; + let _len_of_index_bytes = usize::read_from(reader)?; // Always 8, ignored - let indices = indices_chunks - .map(|chunk| usize::from_le_bytes(chunk.try_into().unwrap())) - .collect(); - - offset = end; - - let ids_len = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize; - offset += 8; - // let mut ids = Vec::new(); - let len_of_id_bytes = usize::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); - offset += 8; - - // get them all and split the bytes into chunks - - let start = offset; - let end = start + ids_len * len_of_id_bytes; - let ids_bytes = &data[start..end]; + let mut indices = Vec::with_capacity(indices_len); + for _ in 0..indices_len { + indices.push(usize::read_from(reader)?); + } - let ids_chunks = ids_bytes.chunks(len_of_id_bytes); + let ids_len = u64::read_from(reader)? as usize; + let _len_of_id_bytes = usize::read_from(reader)?; // Always 16, ignored - // for chunk in ids_chunks { - // let id = String::from_utf8(chunk.to_vec()).unwrap(); - // ids.push(id); - // } - let ids = ids_chunks - .map(|chunk| u128::from_le_bytes(chunk.try_into().unwrap())) - .collect(); + let mut ids = Vec::with_capacity(ids_len); + for _ in 0..ids_len { + ids.push(u128::read_from(reader)?); + } - InvertedIndexItem { indices, ids } + Ok(InvertedIndexItem { indices, ids }) } } @@ -116,13 +89,13 @@ pub fn compress_indices(indices: Vec) -> Vec { let mut current_start = indices[0]; let mut count = 1; - for i in 1..indices.len() { - if indices[i] == current_start + count { + for item in indices.iter().skip(1) { + if *item == current_start + count { count += 1; } else { compressed.push(current_start); compressed.push(count); - current_start = indices[i]; + current_start = *item; count = 1; } } @@ -153,7 +126,6 @@ impl InvertedIndex { } pub fn insert(&mut self, key: KVPair, value: InvertedIndexItem, skip_compression: bool) { - // println!("Inserting INTO INVERTED INDEX: {:?}", key); if !skip_compression { let compressed_indices = compress_indices(value.indices); let value = InvertedIndexItem { @@ -164,26 +136,14 @@ impl InvertedIndex { } else { self.tree.insert(key, value).expect("Failed to insert"); } - // let compressed_indices = compress_indices(value.indices); - // let value = InvertedIndexItem { - // indices: compressed_indices, - // ids: value.ids, - // }; - // self.tree.insert(key, value).expect("Failed to insert"); } pub fn get(&mut self, key: KVPair) -> Option { - // println!("Getting key: {:?}", key); match self.tree.search(key) { Ok(v) => { - // decompress the indices match v { Some(mut item) => { - println!("Search result: {:?}", item); // Add this - item.indices = decompress_indices(item.indices); - println!("Decompressed indices: {:?}", item.indices); // Check output - Some(item) } None => None, @@ -196,7 +156,6 @@ impl InvertedIndex { pub fn insert_append(&mut self, key: KVPair, mut value: InvertedIndexItem) { match self.get(key.clone()) { Some(mut v) => { - // v.indices.extend(value.indices); v.ids.extend(value.ids); let mut decompressed = v.indices.clone(); @@ -210,17 +169,12 @@ impl InvertedIndex { decompressed.sort_unstable(); decompressed.dedup(); - // println!("Before compression: {:?}", decompressed); - v.indices = compress_indices(decompressed); - // println!("After compression: {:?}", v.indices); - self.insert(key, v, true); } None => { value.indices = compress_indices(value.indices); - // println!("Compressed: {:?}", value.indices); self.insert(key, value, true); } } diff --git a/src/structures/metadata_index.rs b/src/structures/metadata_index.rs index 502c0c2..594470f 100644 --- a/src/structures/metadata_index.rs +++ b/src/structures/metadata_index.rs @@ -8,7 +8,7 @@ use crate::structures::mmap_tree::Tree; use super::mmap_tree::serialization::{TreeDeserialization, TreeSerialization}; -#[derive(Debug, Serialize, Deserialize, Clone, Hash)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct KVPair { pub key: String, pub value: String, @@ -28,6 +28,13 @@ impl PartialEq for KVPair { impl Eq for KVPair {} +impl Hash for KVPair { + fn hash(&self, state: &mut H) { + self.key.hash(state); + self.value.hash(state); + } +} + impl PartialOrd for KVPair { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) @@ -49,33 +56,23 @@ impl Display for KVPair { } impl TreeSerialization for KVPair { - fn serialize(&self) -> Vec { - let mut serialized = Vec::new(); - - serialized.extend_from_slice(self.key.len().to_le_bytes().as_ref()); - serialized.extend_from_slice(self.key.as_bytes()); - serialized.extend_from_slice(self.value.len().to_le_bytes().as_ref()); - serialized.extend_from_slice(self.value.as_bytes()); + fn write_to(&self, writer: &mut W) -> std::io::Result<()> { + (self.key.len() as u64).write_to(writer)?; + writer.write_all(self.key.as_bytes())?; + (self.value.len() as u64).write_to(writer)?; + writer.write_all(self.value.as_bytes()) + } - serialized + fn serialized_size(&self) -> usize { + 8 + self.key.len() + 8 + self.value.len() } } impl TreeDeserialization for KVPair { - fn deserialize(data: &[u8]) -> Self { - let mut offset = 0; - - let key_len = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize; - offset += 8; - let key = String::from_utf8(data[offset..offset + key_len].to_vec()).unwrap(); - offset += key_len; - - let value_len = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize; - offset += 8; - let value = String::from_utf8(data[offset..offset + value_len].to_vec()).unwrap(); - // offset += value_len; - - KVPair { key, value } + fn read_from(reader: &mut R) -> std::io::Result { + let key = String::read_from(reader)?; + let value = String::read_from(reader)?; + Ok(KVPair { key, value }) } } @@ -98,103 +95,49 @@ impl Display for MetadataIndexItem { } impl TreeSerialization for MetadataIndexItem { - fn serialize(&self) -> Vec { - let mut serialized = Vec::new(); - - serialized.extend_from_slice(self.kvs.len().to_le_bytes().as_ref()); - // for kv in &self.kvs { - // serialized.extend_from_slice(kv.key.len().to_le_bytes().as_ref()); - // serialized.extend_from_slice(kv.key.as_bytes()); - // serialized.extend_from_slice(kv.value.len().to_le_bytes().as_ref()); - // serialized.extend_from_slice(kv.value.as_bytes()); - // } + fn write_to(&self, writer: &mut W) -> std::io::Result<()> { + (self.kvs.len() as u64).write_to(writer)?; + for kv in &self.kvs { - let serialized_kv = TreeSerialization::serialize(kv); - serialized.extend_from_slice(serialized_kv.len().to_le_bytes().as_ref()); - serialized.extend_from_slice(serialized_kv.as_ref()); + (kv.serialized_size() as u64).write_to(writer)?; + kv.write_to(writer)?; } - // serialized.extend_from_slice(self.id.len().to_le_bytes().as_ref()); - serialized.extend_from_slice(self.id.to_le_bytes().as_ref()); - - serialized.extend_from_slice(self.vector_index.to_le_bytes().as_ref()); - - // serialized.extend_from_slice(self.namespaced_id.len().to_le_bytes().as_ref()); - // serialized.extend_from_slice(self.namespaced_id.as_bytes()); + self.id.write_to(writer)?; + self.vector_index.write_to(writer) + } - serialized + fn serialized_size(&self) -> usize { + let mut size = 8; // kvs.len() + for kv in &self.kvs { + size += 8; // kv size prefix + size += kv.serialized_size(); + } + size += 16; // id (u128) + size += 8; // vector_index (usize) + size } } impl TreeDeserialization for MetadataIndexItem { - fn deserialize(data: &[u8]) -> Self { - let mut offset = 0; - - let kvs_len = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize; - offset += 8; + fn read_from(reader: &mut R) -> std::io::Result { + let kvs_len = u64::read_from(reader)? as usize; let mut kvs = Vec::new(); for _ in 0..kvs_len { - // let key_len = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize; - // offset += 8; - - // let key = String::from_utf8(data[offset..offset + key_len].to_vec()).unwrap(); - // offset += key_len; - - // let value_len = - // u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize; - // offset += 8; - - // let value = String::from_utf8(data[offset..offset + value_len].to_vec()).unwrap(); - // offset += value_len; - - // kvs.push(KVPair { key, value }); - - let kv_len = - usize::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize; - offset += 8; - - let kv = TreeDeserialization::deserialize(&data[offset..offset + kv_len]); - offset += kv_len; - + let _kv_len = u64::read_from(reader)?; // Size prefix (ignored, we read until complete) + let kv = KVPair::read_from(reader)?; kvs.push(kv); } - // let id_len = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize; - // offset += 8; - - let id = u128::from_le_bytes(data[offset..offset + 16].try_into().unwrap()); - offset += 16; - - let vector_index = usize::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); - // offset += 8; - - // let namespaced_id_len = - // u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize; - // offset += 8; + let id = u128::read_from(reader)?; + let vector_index = usize::read_from(reader)?; - // let namespaced_id = - // String::from_utf8(data[offset..offset + namespaced_id_len].to_vec()).unwrap(); - // offset += namespaced_id_len; - - MetadataIndexItem { + Ok(MetadataIndexItem { kvs, id, vector_index, - // namespaced_id, - } - } -} - -impl TreeSerialization for u128 { - fn serialize(&self) -> Vec { - self.to_le_bytes().to_vec() - } -} - -impl TreeDeserialization for u128 { - fn deserialize(data: &[u8]) -> Self { - u128::from_le_bytes(data.try_into().unwrap()) + }) } } @@ -221,9 +164,6 @@ impl MetadataIndex { } pub fn get(&mut self, key: u128) -> Option { - match self.tree.search(key) { - Ok(v) => v, - Err(_) => None, - } + self.tree.search(key).unwrap_or_default() } } diff --git a/src/structures/mmap_tree.rs b/src/structures/mmap_tree.rs index 525754a..e968fff 100644 --- a/src/structures/mmap_tree.rs +++ b/src/structures/mmap_tree.rs @@ -5,13 +5,14 @@ pub mod storage; use std::fmt::{Debug, Display}; use std::io; use std::path::PathBuf; +use tracing::{error, warn}; use node::{Node, NodeType}; use serialization::{TreeDeserialization, TreeSerialization}; use storage::StorageManager; pub struct Tree { - pub b: usize, + pub branching_factor: usize, pub storage_manager: storage::StorageManager, } @@ -23,41 +24,30 @@ where pub fn new(path: PathBuf) -> io::Result { let mut storage_manager = StorageManager::::new(path)?; - // println!("INIT Used space: {}", storage_manager.used_space); - if storage_manager.used_space() == 0 { - let root_offset: usize; let mut root = Node::new_leaf(0); root.is_root = true; - root_offset = storage_manager.store_node(&mut root)?; + let root_offset: usize = storage_manager.store_node(&mut root)?; storage_manager.set_root_offset(root_offset); - // println!("Initialized from scratch with Root offset: {}", root_offset); } Ok(Tree { storage_manager, - b: 32, + branching_factor: 32, }) } pub fn insert(&mut self, key: K, value: V) -> Result<(), io::Error> { - // println!("Inserting key: {}, value: {}", key, value); let mut root = self .storage_manager .load_node(self.storage_manager.root_offset())?; - // println!("Root offset: {}, {}", self.root_offset, root.offset); - if root.is_full() { - // println!("Root is full, needs splitting"); let mut new_root = Node::new_internal(0); new_root.is_root = true; - let (median, mut sibling) = root.split(self.b)?; - // println!("Root split: median = {}, new sibling created", median); - // println!("Root split: median = {}, new sibling created", median); + let (median, mut sibling) = root.split(self.branching_factor)?; root.is_root = false; self.storage_manager.store_node(&mut root)?; - // println!("Root stored"); let sibling_offset = self.storage_manager.store_node(&mut sibling)?; new_root.keys.push(median); new_root.children.push(self.storage_manager.root_offset()); // old root offset @@ -70,16 +60,9 @@ where sibling.parent_offset = Some(new_root.offset); self.storage_manager.store_node(&mut root)?; self.storage_manager.store_node(&mut sibling)?; - // println!( - // "New root created with children offsets: {} and {}", - // self.root_offset, sibling_offset - // ); } - // println!("Inserting into non-full root"); self.insert_non_full(self.storage_manager.root_offset(), key, value, 0)?; - // println!("Inserted key, root offset: {}", self.root_offset); - Ok(()) } @@ -92,56 +75,28 @@ where ) -> Result<(), io::Error> { if depth > 100 { // Set a reasonable limit based on your observations - println!("Recursion depth limit reached: {}", depth); + error!(depth, "Recursion depth limit reached"); return Ok(()); } let mut node = self.storage_manager.load_node(node_offset)?; - // println!( - // "Depth: {}, Node type: {:?}, Keys: {:?}, is_full: {}", - // depth, - // node.node_type, - // node.keys, - // node.is_full() - // ); if node.node_type == NodeType::Leaf { let idx = node.keys.binary_search(&key).unwrap_or_else(|x| x); - // println!( - // "Inserting into leaf node: key: {}, len: {}", - // key, - // node.keys.len() - // ); - // println!( - // "Inserting into leaf node: key: {}, idx: {}, node_offset: {}", - // key, idx, node_offset - // ); if node.keys.get(idx) == Some(&key) { node.values[idx] = Some(value); - // println!( - // "Storing leaf node with keys: {:?}, offset: {}", - // node.keys, node.offset - // ); self.storage_manager.store_node(&mut node)?; if node.is_root { - // println!("Updating root offset to: {}", node.offset); - // self.root_offset = node.offset.clone(); self.storage_manager.set_root_offset(node.offset); } } else { node.keys.insert(idx, key); node.values.insert(idx, Some(value)); - // println!( - // "Storing leaf node with keys: {:?}, offset: {}", - // node.keys, node.offset - // ); self.storage_manager.store_node(&mut node)?; if node.is_root { - // println!("Updating root offset to: {}", node.offset); - // self.root_offset = node.offset.clone(); self.storage_manager.set_root_offset(node.offset); } } @@ -151,8 +106,7 @@ where let mut child = self.storage_manager.load_node(child_offset)?; if child.is_full() { - // println!("Child is full, needs splitting"); - let (median, mut sibling) = child.split(self.b)?; + let (median, mut sibling) = child.split(self.branching_factor)?; let sibling_offset = self.storage_manager.store_node(&mut sibling)?; node.keys.insert(idx, median.clone()); @@ -177,7 +131,6 @@ where } fn search_node(&mut self, node_offset: usize, key: K) -> Result, io::Error> { - // println!("Searching for key: {} at offset: {}", key, node_offset); let node = self.storage_manager.load_node(node_offset)?; match node.node_type { @@ -247,7 +200,7 @@ where } NodeType::Leaf => { let mut idx = node.keys.binary_search(&start).unwrap_or_else(|x| x); - if node.keys.len() == 0 { + if node.keys.is_empty() { return Ok(()); } if idx == node.keys.len() { @@ -268,7 +221,7 @@ where pub fn batch_insert(&mut self, entries: Vec<(K, V)>) -> Result<(), io::Error> { if entries.is_empty() { - println!("No entries to insert"); + warn!("No entries to insert in batch_insert"); return Ok(()); } @@ -289,12 +242,11 @@ where } if node.is_full() { - let (median, mut sibling) = node.split(self.b)?; + let (median, mut sibling) = node.split(self.branching_factor)?; let sibling_offset = self.storage_manager.store_node(&mut sibling)?; self.storage_manager.store_node(&mut node)?; // Store changes to the original node after splitting if node.is_root { - // println!("Creating new root"); // Create a new root if the current node is the root let mut new_root = Node::new_internal(0); new_root.is_root = true; @@ -307,13 +259,11 @@ where node.is_root = false; node.parent_offset = Some(new_root_offset); sibling.parent_offset = Some(new_root_offset); - // println!("New root offset: {}", new_root_offset); self.storage_manager.store_node(&mut node)?; self.storage_manager.store_node(&mut sibling)?; } else { // Update the parent node with the new median let parent_offset = node.parent_offset.unwrap(); - // println!("Parent offset: {}", parent_offset); let mut parent = self.storage_manager.load_node(parent_offset)?; let idx = parent .keys @@ -353,4 +303,9 @@ where Ok(current_offset) } + + /// Flush all changes to disk for durability + pub fn flush(&mut self) -> io::Result<()> { + self.storage_manager.flush() + } } diff --git a/src/structures/mmap_tree/node.rs b/src/structures/mmap_tree/node.rs index 1d2a4c4..99a4c1c 100644 --- a/src/structures/mmap_tree/node.rs +++ b/src/structures/mmap_tree/node.rs @@ -74,8 +74,7 @@ where match self.node_type { NodeType::Internal => { if b <= 1 || b > self.keys.len() { - return Err(io::Error::new( - io::ErrorKind::Other, + return Err(io::Error::other( "Invalid split point for internal node", )); } @@ -103,8 +102,7 @@ where } NodeType::Leaf => { if b < 1 || b >= self.keys.len() { - return Err(io::Error::new( - io::ErrorKind::Other, + return Err(io::Error::other( "Invalid split point for leaf node", )); } @@ -134,7 +132,7 @@ where pub fn is_full(&self) -> bool { let b = self.max_keys; - return self.keys.len() >= (2 * b - 1); + self.keys.len() >= (2 * b - 1) } pub fn serialize(&self) -> Vec { @@ -148,7 +146,7 @@ where for key in &self.keys { let serialized_key = key.serialize(); - serialized.extend_from_slice(&serialize_length( + serialized.extend_from_slice(serialize_length( &mut Vec::new(), serialized_key.len() as u32, )); @@ -159,7 +157,7 @@ where match value { Some(value) => { let serialized_value = value.serialize(); - serialized.extend_from_slice(&serialize_length( + serialized.extend_from_slice(serialize_length( &mut Vec::new(), serialized_value.len() as u32, )); @@ -195,7 +193,7 @@ where let mut keys = Vec::with_capacity(keys_len); for _ in 0..keys_len { - let key_size = read_length(&data[offset..offset + 4]) as usize; + let key_size = read_length(&data[offset..offset + 4]); offset += 4; let key = K::deserialize(&data[offset..offset + key_size]); offset += key_size; @@ -204,7 +202,7 @@ where let mut values = Vec::with_capacity(values_len); for _ in 0..values_len { - let value_size = read_length(&data[offset..offset + 4]) as usize; + let value_size = read_length(&data[offset..offset + 4]); offset += 4; let value = if value_size > 0 { Some(V::deserialize(&data[offset..offset + value_size])) diff --git a/src/structures/mmap_tree/serialization.rs b/src/structures/mmap_tree/serialization.rs index 9ec9f29..76e4544 100644 --- a/src/structures/mmap_tree/serialization.rs +++ b/src/structures/mmap_tree/serialization.rs @@ -1,45 +1,270 @@ +use std::io::{self, Read, Write}; + +/// Trait for serializing keys and values stored in B-Tree nodes to bytes. +/// +/// This trait is used by the memory-mapped B-Tree implementation to persist +/// keys and values to disk. Types that can be stored in the B-Tree must +/// implement this trait to define their binary representation. +/// +/// # Examples +/// +/// ```ignore +/// impl TreeSerialization for i32 { +/// fn write_to(&self, writer: &mut W) -> io::Result<()> { +/// writer.write_all(&self.to_le_bytes()) +/// } +/// +/// fn serialized_size(&self) -> usize { +/// 4 +/// } +/// } +/// ``` pub trait TreeSerialization { - fn serialize(&self) -> Vec; + /// Writes the serialized value to the provided writer. + /// + /// The bytes will be written to the memory-mapped file backing the B-Tree. + fn write_to(&self, writer: &mut W) -> io::Result<()>; + + /// Returns the size in bytes of the serialized representation. + /// + /// This is used to pre-calculate buffer sizes and avoid unnecessary allocations. + fn serialized_size(&self) -> usize; + + /// Serializes the value to a byte vector. + /// + /// This is a convenience method that allocates a Vec. Prefer using `write_to` + /// when possible to avoid allocations. + fn serialize(&self) -> Vec { + let mut buf = Vec::with_capacity(self.serialized_size()); + self.write_to(&mut buf).expect("writing to Vec should not fail"); + buf + } } +/// Trait for deserializing keys and values from bytes back into B-Tree node types. +/// +/// This trait is the counterpart to `TreeSerialization` and is used to reconstruct +/// keys and values when loading B-Tree nodes from the memory-mapped file. +/// +/// Uses a reader that automatically tracks position, making deserialization composable +/// without manual offset tracking. +/// +/// # Examples +/// +/// ```ignore +/// impl TreeDeserialization for i32 { +/// fn read_from(reader: &mut R) -> io::Result { +/// let mut bytes = [0; 4]; +/// reader.read_exact(&mut bytes)?; +/// Ok(i32::from_le_bytes(bytes)) +/// } +/// } +/// ``` pub trait TreeDeserialization { - fn deserialize(data: &[u8]) -> Self + /// Reads and deserializes a value from the provided reader. + /// + /// The reader's position advances automatically as data is read. + /// + /// # Arguments + /// + /// * `reader` - Reader to deserialize from (e.g., `Cursor<&[u8]>`) + /// + /// # Errors + /// + /// Returns an error if reading fails or data is malformed. + fn read_from(reader: &mut R) -> io::Result where Self: Sized; + + /// Convenience method to deserialize from a byte slice. + /// + /// Creates a cursor internally and calls `read_from`. + fn deserialize(data: &[u8]) -> Self + where + Self: Sized, + { + let mut cursor = std::io::Cursor::new(data); + Self::read_from(&mut cursor).expect("deserialization failed") + } } impl TreeDeserialization for i32 { - fn deserialize(data: &[u8]) -> Self { + fn read_from(reader: &mut R) -> io::Result { let mut bytes = [0; 4]; - bytes.copy_from_slice(&data[..4]); - i32::from_le_bytes(bytes) + reader.read_exact(&mut bytes)?; + Ok(i32::from_le_bytes(bytes)) } } impl TreeSerialization for i32 { - fn serialize(&self) -> Vec { - self.to_le_bytes().to_vec() + fn write_to(&self, writer: &mut W) -> io::Result<()> { + writer.write_all(&self.to_le_bytes()) + } + + fn serialized_size(&self) -> usize { + 4 + } +} + +impl TreeDeserialization for u64 { + fn read_from(reader: &mut R) -> io::Result { + let mut bytes = [0; 8]; + reader.read_exact(&mut bytes)?; + Ok(u64::from_le_bytes(bytes)) } } + +impl TreeSerialization for u64 { + fn write_to(&self, writer: &mut W) -> io::Result<()> { + writer.write_all(&self.to_le_bytes()) + } + + fn serialized_size(&self) -> usize { + 8 + } +} + +impl TreeDeserialization for u128 { + fn read_from(reader: &mut R) -> io::Result { + let mut bytes = [0; 16]; + reader.read_exact(&mut bytes)?; + Ok(u128::from_le_bytes(bytes)) + } +} + +impl TreeSerialization for u128 { + fn write_to(&self, writer: &mut W) -> io::Result<()> { + writer.write_all(&self.to_le_bytes()) + } + + fn serialized_size(&self) -> usize { + 16 + } +} + +impl TreeSerialization for usize { + fn write_to(&self, writer: &mut W) -> io::Result<()> { + writer.write_all(&self.to_le_bytes()) + } + + fn serialized_size(&self) -> usize { + 8 + } +} + +impl TreeDeserialization for usize { + fn read_from(reader: &mut R) -> io::Result { + let mut bytes = [0; 8]; + reader.read_exact(&mut bytes)?; + Ok(usize::from_le_bytes(bytes)) + } +} + impl TreeDeserialization for String { - fn deserialize(data: &[u8]) -> Self { - if data.len() < 4 { - panic!("Data too short to contain length prefix"); + fn read_from(reader: &mut R) -> io::Result { + let len = u64::read_from(reader)? as usize; + let mut bytes = vec![0u8; len]; + reader.read_exact(&mut bytes)?; + String::from_utf8(bytes).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + } +} + +impl TreeSerialization for u32 { + fn write_to(&self, writer: &mut W) -> io::Result<()> { + writer.write_all(&self.to_le_bytes()) + } + + fn serialized_size(&self) -> usize { + 4 + } +} + +impl TreeSerialization for u8 { + fn write_to(&self, writer: &mut W) -> io::Result<()> { + writer.write_all(&[*self]) + } + + fn serialized_size(&self) -> usize { + 1 + } +} + +impl TreeDeserialization for u8 { + fn read_from(reader: &mut R) -> io::Result { + let mut byte = [0u8; 1]; + reader.read_exact(&mut byte)?; + Ok(byte[0]) + } +} + +impl TreeSerialization for [T] { + fn write_to(&self, writer: &mut W) -> io::Result<()> { + (self.len() as u64).write_to(writer)?; + for item in self { + item.write_to(writer)?; + } + Ok(()) + } + + fn serialized_size(&self) -> usize { + 8 + self.iter().map(|item| item.serialized_size()).sum::() + } +} + +impl TreeSerialization for [T; N] { + fn write_to(&self, writer: &mut W) -> io::Result<()> { + // Fixed-size arrays don't need length prefix since size is known at compile time + for item in self { + item.write_to(writer)?; } - let len = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize; // Read length - if data.len() < 4 + len { - panic!("Data too short for specified string length"); + Ok(()) + } + + fn serialized_size(&self) -> usize { + self.iter().map(|item| item.serialized_size()).sum::() + } +} + +impl TreeDeserialization for [T; N] { + fn read_from(reader: &mut R) -> io::Result { + // Fixed-size arrays don't have length prefix + let mut result = Vec::with_capacity(N); + for _ in 0..N { + result.push(T::read_from(reader)?); } - let string_data = &data[4..4 + len]; // Extract string data - String::from_utf8(string_data.to_vec()).unwrap() + result + .try_into() + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "array size mismatch")) + } +} + +impl TreeSerialization for Vec { + fn write_to(&self, writer: &mut W) -> io::Result<()> { + self.as_slice().write_to(writer) + } + + fn serialized_size(&self) -> usize { + self.as_slice().serialized_size() + } +} + +impl TreeDeserialization for Vec { + fn read_from(reader: &mut R) -> io::Result { + let len = u64::read_from(reader)? as usize; + let mut result = Vec::with_capacity(len); + for _ in 0..len { + result.push(T::read_from(reader)?); + } + Ok(result) } } impl TreeSerialization for String { - fn serialize(&self) -> Vec { - let mut data = Vec::new(); - data.extend_from_slice(&(self.len() as u32).to_le_bytes()); // Write length - data.extend_from_slice(self.as_bytes()); // Write string data - data + fn write_to(&self, writer: &mut W) -> io::Result<()> { + self.as_bytes().write_to(writer) + } + + fn serialized_size(&self) -> usize { + self.as_bytes().serialized_size() } } diff --git a/src/structures/mmap_tree/storage.rs b/src/structures/mmap_tree/storage.rs index a3d0b47..262688a 100644 --- a/src/structures/mmap_tree/storage.rs +++ b/src/structures/mmap_tree/storage.rs @@ -5,7 +5,6 @@ use memmap::MmapMut; use std::fs; use std::fs::OpenOptions; use std::io; -use std::path::PathBuf; use super::serialization::{TreeDeserialization, TreeSerialization}; use std::fmt::Debug; @@ -13,7 +12,7 @@ use std::fmt::Debug; pub struct StorageManager { pub mmap: MmapMut, pub used_space: usize, - path: PathBuf, + file: fs::File, phantom: std::marker::PhantomData<(K, V)>, locks: LockService, } @@ -31,13 +30,13 @@ where K: Clone + Ord + TreeSerialization + TreeDeserialization + Debug, V: Clone + TreeSerialization + TreeDeserialization, { - pub fn new(path: PathBuf) -> io::Result { + pub fn new(path: std::path::PathBuf) -> io::Result { let exists = path.exists(); let file = OpenOptions::new() .read(true) .write(true) .create(!exists) - .open(path.clone())?; + .open(&path)?; if !exists { file.set_len(1_000_000)?; @@ -46,7 +45,7 @@ where let mmap = unsafe { MmapMut::map_mut(&file)? }; // take path, remove everything after the last dot (the extension), and add _locks - let mut locks_path = path.clone().to_str().unwrap().to_string(); + let mut locks_path = path.to_str().unwrap().to_string(); let last_dot = locks_path.rfind('.').unwrap(); locks_path.replace_range(last_dot.., "_locks"); @@ -55,7 +54,7 @@ where let mut manager = StorageManager { mmap, used_space: 0, - path, + file, phantom: std::marker::PhantomData, locks: LockService::new(locks_path.into()), }; @@ -80,7 +79,7 @@ where let serialized_len = serialized.len(); - let num_blocks_required = (serialized_len + BLOCK_DATA_SIZE - 1) / BLOCK_DATA_SIZE; + let num_blocks_required = serialized_len.div_ceil(BLOCK_DATA_SIZE); let mut needs_new_blocks = true; @@ -97,7 +96,7 @@ where .unwrap(), ); prev_num_blocks_required = - (prev_serialized_len + BLOCK_DATA_SIZE - 1) / BLOCK_DATA_SIZE; + prev_serialized_len.div_ceil(BLOCK_DATA_SIZE); needs_new_blocks = num_blocks_required > prev_num_blocks_required; // println!( @@ -111,9 +110,9 @@ where // node.offset, serialized_len // ); - let mut current_block_offset = node.offset.clone(); + let mut current_block_offset = node.offset; - let original_offset = current_block_offset.clone(); + let original_offset = current_block_offset; let mut remaining_bytes_to_write = serialized_len; @@ -229,8 +228,8 @@ where } pub fn load_node(&mut self, offset: usize) -> io::Result> { - let original_offset = offset.clone(); - let mut offset = offset.clone(); + let original_offset = offset; + let mut offset = offset; // println!("Loading node at offset: {}", offset); @@ -286,7 +285,7 @@ where bytes_read += bytes_to_read; - serialized.extend_from_slice(&self.read_from_offset(offset, bytes_to_read)); + serialized.extend_from_slice(self.read_from_offset(offset, bytes_to_read)); offset += BLOCK_DATA_SIZE; @@ -317,14 +316,9 @@ where let current_len = self.mmap.len(); let new_len = current_len * 2; - let file = OpenOptions::new() - .read(true) - .write(true) - .open(self.path.clone())?; // Ensure this path is handled correctly - - file.set_len(new_len as u64)?; + self.file.set_len(new_len as u64)?; - self.mmap = unsafe { MmapMut::map_mut(&file)? }; + self.mmap = unsafe { MmapMut::map_mut(&self.file)? }; Ok(()) } @@ -366,7 +360,13 @@ where fn write_to_offset(&mut self, offset: usize, data: &[u8]) { self.mmap[offset..offset + data.len()].copy_from_slice(data); - // self.mmap.flush().unwrap(); + } + + /// Flush mmap and sync to disk for durability + pub fn flush(&mut self) -> io::Result<()> { + self.mmap.flush()?; + self.file.sync_all()?; + Ok(()) } fn read_from_offset(&self, offset: usize, len: usize) -> &[u8] { diff --git a/src/structures/tree.rs b/src/structures/tree.rs deleted file mode 100644 index 5bc73ef..0000000 --- a/src/structures/tree.rs +++ /dev/null @@ -1,103 +0,0 @@ -pub mod node; -pub mod serialization; - -use std::fmt::{Debug, Display}; -use std::io; - -use node::{Node, NodeType}; -use serialization::{TreeDeserialization, TreeSerialization}; - -pub struct Tree { - pub root: Box>, - pub b: usize, -} - -impl Tree -where - K: Clone + Ord + TreeSerialization + TreeDeserialization + Display + Debug + Copy, - V: Clone + TreeSerialization + TreeDeserialization + Display + Debug, -{ - pub fn new() -> Self { - Tree { - root: Box::new(Node::new_leaf()), // Initially the root is a leaf node - b: 4, - } - } - - pub fn insert(&mut self, key: K, value: V) -> Result<(), io::Error> { - let mut root = std::mem::replace(&mut self.root, Box::new(Node::new_leaf())); - if self.is_node_full(&root)? { - let mut new_root = Node::new_internal(); - let (median, sibling) = root.split()?; - new_root.keys.push(median); - new_root.children.push(root); - new_root.children.push(Box::new(sibling)); - root = Box::new(new_root); - } - self.insert_non_full(&mut *root, key, value)?; - self.root = root; - Ok(()) - } - - fn insert_non_full( - &mut self, - node: &mut Node, - key: K, - value: V, - ) -> Result<(), io::Error> { - match &mut node.node_type { - NodeType::Leaf => { - let idx = node.keys.binary_search(&key).unwrap_or_else(|x| x); - node.keys.insert(idx, key); - node.values.insert(idx, Some(value)); - Ok(()) - } - NodeType::Internal => { - let idx = node.keys.binary_search(&key).unwrap_or_else(|x| x); - let child_idx = if idx == node.keys.len() || key < node.keys[idx] { - idx - } else { - idx + 1 - }; - - if self.is_node_full(&node.children[child_idx])? { - let (median, sibling) = node.children[child_idx].split()?; - node.keys.insert(idx, median); - node.children.insert(child_idx + 1, Box::new(sibling)); - if key >= node.keys[idx] { - self.insert_non_full(&mut *node.children[child_idx + 1], key, value) - } else { - self.insert_non_full(&mut *node.children[child_idx], key, value) - } - } else { - self.insert_non_full(&mut *node.children[child_idx], key, value) - } - } - } - } - - fn is_node_full(&self, node: &Node) -> Result { - Ok(node.keys.len() == node.max_keys) - } - - pub fn search(&self, key: K) -> Result, io::Error> { - self.search_node(&*self.root, key) - } - - fn search_node(&self, node: &Node, key: K) -> Result, io::Error> { - match node.node_type { - NodeType::Internal => { - let idx = node.keys.binary_search(&key).unwrap_or_else(|x| x); - if idx < node.keys.len() && node.keys[idx] == key { - self.search_node(&node.children[idx + 1], key) - } else { - self.search_node(&node.children[idx], key) - } - } - NodeType::Leaf => match node.keys.binary_search(&key) { - Ok(idx) => Ok(node.values.get(idx).expect("could not get value").clone()), - Err(_) => Ok(None), - }, - } - } -} diff --git a/src/structures/tree/serialization.rs b/src/structures/tree/serialization.rs index 23604ca..ea1d526 100644 --- a/src/structures/tree/serialization.rs +++ b/src/structures/tree/serialization.rs @@ -1,5 +1,21 @@ +use std::io::{self, Write}; + pub trait TreeSerialization { - fn serialize(&self) -> Vec; + /// Writes the serialized value to the provided writer. + fn write_to(&self, writer: &mut W) -> io::Result<()>; + + /// Returns the size in bytes of the serialized representation. + fn serialized_size(&self) -> usize; + + /// Serializes the value to a byte vector. + /// + /// This is a convenience method that allocates a Vec. Prefer using `write_to` + /// when possible to avoid allocations. + fn serialize(&self) -> Vec { + let mut buf = Vec::with_capacity(self.serialized_size()); + self.write_to(&mut buf).expect("writing to Vec should not fail"); + buf + } } pub trait TreeDeserialization { @@ -17,8 +33,12 @@ impl TreeDeserialization for i32 { } impl TreeSerialization for i32 { - fn serialize(&self) -> Vec { - self.to_le_bytes().to_vec() + fn write_to(&self, writer: &mut W) -> io::Result<()> { + writer.write_all(&self.to_le_bytes()) + } + + fn serialized_size(&self) -> usize { + 4 } } @@ -39,10 +59,12 @@ impl TreeDeserialization for String { } impl TreeSerialization for String { - fn serialize(&self) -> Vec { - let mut data = Vec::new(); - data.extend_from_slice(&(self.len() as i32).to_le_bytes()); - data.extend_from_slice(self.as_bytes()); - data + fn write_to(&self, writer: &mut W) -> io::Result<()> { + writer.write_all(&(self.len() as i32).to_le_bytes())?; + writer.write_all(self.as_bytes()) + } + + fn serialized_size(&self) -> usize { + 4 + self.len() } } diff --git a/src/structures/wal.rs b/src/structures/wal.rs index e14a0c6..cd3e832 100644 --- a/src/structures/wal.rs +++ b/src/structures/wal.rs @@ -36,133 +36,52 @@ impl Display for CommitListItem { } impl TreeSerialization for CommitListItem { - fn serialize(&self) -> Vec { - let mut serialized = Vec::new(); - - serialized.extend_from_slice(self.hash.to_le_bytes().as_ref()); - serialized.extend_from_slice(self.timestamp.to_le_bytes().as_ref()); - - serialized.extend_from_slice(self.vectors.len().to_le_bytes().as_ref()); - for vector in &self.vectors { - serialized.extend_from_slice(vector.as_ref()); - } - - serialized.extend_from_slice(self.kvs.len().to_le_bytes().as_ref()); - for sub_kvs in &self.kvs { - serialized.extend_from_slice(sub_kvs.len().to_le_bytes().as_ref()); - for kv in sub_kvs { - serialized.extend_from_slice(&kv.serialize()); - } - } + fn write_to(&self, writer: &mut W) -> std::io::Result<()> { + self.hash.write_to(writer)?; + self.timestamp.write_to(writer)?; + self.vectors.write_to(writer)?; + self.kvs.write_to(writer)?; + Ok(()) + } - serialized + fn serialized_size(&self) -> usize { + self.hash.serialized_size() + + self.timestamp.serialized_size() + + self.vectors.serialized_size() + + self.kvs.serialized_size() } } impl TreeDeserialization for CommitListItem { - fn deserialize(data: &[u8]) -> Self { - let mut offset = 0; - - let hash = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); - offset += 8; - let timestamp = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); - offset += 8; - - let vectors_len = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize; - - offset += 8; - - let mut vectors = Vec::new(); - for _ in 0..vectors_len { - let mut vector = [0; QUANTIZED_VECTOR_SIZE]; - vector.copy_from_slice(&data[offset..offset + QUANTIZED_VECTOR_SIZE]); - offset += QUANTIZED_VECTOR_SIZE; - vectors.push(vector); - } + fn read_from(reader: &mut R) -> std::io::Result { + let hash = u64::read_from(reader)?; + let timestamp = u64::read_from(reader)?; + let vectors = Vec::read_from(reader)?; + let kvs = Vec::read_from(reader)?; - let kvs_len = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize; - offset += 8; - - let mut kvs = Vec::new(); - for _ in 0..kvs_len { - let mut sub_kvs = Vec::new(); - let sub_kvs_len = - u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize; - offset += 8; - for _ in 0..sub_kvs_len { - let kv = KVPair::deserialize(&data[offset..]); - offset += kv.serialize().len(); - sub_kvs.push(kv); - } - - kvs.push(sub_kvs); - } - - CommitListItem { + Ok(CommitListItem { hash, timestamp, kvs, vectors, - } + }) } } impl TreeSerialization for bool { - fn serialize(&self) -> Vec { - let mut serialized = Vec::new(); - - serialized.extend_from_slice(&[*self as u8]); - - serialized - } -} - -impl TreeDeserialization for bool { - fn deserialize(data: &[u8]) -> Self { - data[0] == 1 + fn write_to(&self, writer: &mut W) -> std::io::Result<()> { + writer.write_all(&[*self as u8]) } -} - -impl TreeSerialization for u64 { - fn serialize(&self) -> Vec { - self.to_le_bytes().to_vec() - } -} -impl TreeDeserialization for u64 { - fn deserialize(data: &[u8]) -> Self { - u64::from_le_bytes(data.try_into().unwrap()) + fn serialized_size(&self) -> usize { + 1 } } -impl TreeSerialization for Vec { - fn serialize(&self) -> Vec { - let mut serialized = Vec::new(); - - serialized.extend_from_slice(self.len().to_le_bytes().as_ref()); - for val in self { - serialized.extend_from_slice(val.to_le_bytes().as_ref()); - } - - serialized - } -} - -impl TreeDeserialization for Vec { - fn deserialize(data: &[u8]) -> Self { - let mut offset = 0; - - let len = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize; - offset += 8; - - let mut vals = Vec::new(); - for _ in 0..len { - let val = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); - offset += 8; - vals.push(val); - } - - vals +impl TreeDeserialization for bool { + fn read_from(reader: &mut R) -> std::io::Result { + let byte = u8::read_from(reader)?; + Ok(byte == 1) } } @@ -211,6 +130,8 @@ impl WAL { }; self.commit_list.insert(hash, commit_list_item)?; + // Flush to ensure durability for WAL + self.commit_list.flush()?; // self.commit_finish.insert(hash, false)?; @@ -226,26 +147,6 @@ impl WAL { } } - // pub fn get_commits_after(&self, timestamp: u64) -> Result, io::Error> { - // let hashes = self.timestamps.get_range(timestamp, u64::MAX)?; - - // let mut commits = Vec::new(); - - // for (_, hash) in hashes { - // match self.commit_list.search(hash) { - // Ok(commit) => match commit { - // Some(c) => { - // commits.push(c); - // } - // None => {} - // }, - // Err(_) => {} - // } - // } - - // Ok(commits) - // } - pub fn get_commits(&mut self) -> Result, io::Error> { let start = 0; let end = u64::MAX; @@ -272,14 +173,8 @@ impl WAL { for (_, hash) in hash_end { for h in hash { - match self.commit_list.search(h) { - Ok(commit) => match commit { - Some(c) => { - commits.push(c); - } - None => {} - }, - Err(_) => {} + if let Ok(Some(c)) = self.commit_list.search(h) { + commits.push(c); } } } @@ -308,21 +203,12 @@ impl WAL { for (_, hashes) in all_hashes { for hash in hashes { - match self.commit_finish.has_key(hash) { - Ok(has_key) => { - if !has_key { - match self.commit_list.search(hash) { - Ok(commit) => match commit { - Some(c) => { - commits.push(c); - } - None => {} - }, - Err(_) => {} - } + if let Ok(has_key) = self.commit_finish.has_key(hash) { + if !has_key { + if let Ok(Some(c)) = self.commit_list.search(hash) { + commits.push(c); } } - Err(_) => {} } } } @@ -362,7 +248,7 @@ impl WAL { } let quantized_vectors: Vec<[u8; QUANTIZED_VECTOR_SIZE]> = - vectors.iter().map(|v| quantize(v)).collect(); + vectors.iter().map(quantize).collect(); let hash = self.compute_hash(&quantized_vectors, &kvs); @@ -383,6 +269,7 @@ impl WAL { self.timestamps .insert(current_timestamp, current_timestamp_vals)?; + self.timestamps.flush()?; self.add_to_commit_list(hash, quantized_vectors, kvs)?; @@ -403,7 +290,7 @@ impl WAL { let quantized_vectors: Vec> = vectors .iter() - .map(|v| v.iter().map(|v| quantize(v)).collect()) + .map(|v| v.iter().map(quantize).collect()) .collect(); let mut hashes = Vec::new(); @@ -419,7 +306,7 @@ impl WAL { } .unwrap_or(Vec::new()); - for (_i, (v, k)) in quantized_vectors.iter().zip(kvs.iter()).enumerate() { + for (v, k) in quantized_vectors.iter().zip(kvs.iter()) { let hash = self.compute_hash(v, k); hashes.push(hash); @@ -428,6 +315,7 @@ impl WAL { self.timestamps .insert(current_timestamp, current_timestamp_vals)?; + self.timestamps.flush()?; for (hash, (v, k)) in hashes.iter().zip(quantized_vectors.iter().zip(kvs.iter())) { self.add_to_commit_list(*hash, v.clone(), k.clone())?; @@ -438,7 +326,16 @@ impl WAL { pub fn mark_commit_finished(&mut self, hash: u64) -> io::Result<()> { self.commit_finish.insert(hash, true)?; + self.commit_finish.flush()?; + + Ok(()) + } + /// Flush all WAL components to disk for durability + pub fn flush(&mut self) -> io::Result<()> { + self.commit_list.flush()?; + self.timestamps.flush()?; + self.commit_finish.flush()?; Ok(()) } } diff --git a/src/utils/quantization.rs b/src/utils/quantization.rs index 62a1a19..b72469a 100644 --- a/src/utils/quantization.rs +++ b/src/utils/quantization.rs @@ -1,9 +1,9 @@ -use crate::constants::{QUANTIZED_VECTOR_SIZE, VECTOR_SIZE}; +use crate::constants::{BITS_PER_ELEMENT, QUANTIZED_VECTOR_SIZE, VECTOR_SIZE}; pub fn quantize(vec: &[f32; VECTOR_SIZE]) -> [u8; QUANTIZED_VECTOR_SIZE] { let mut result = [0; QUANTIZED_VECTOR_SIZE]; for i in 0..QUANTIZED_VECTOR_SIZE { - for j in 0..8 { + for j in 0..BITS_PER_ELEMENT { result[i] |= ((vec[i * 8 + j] >= 0.0) as u8) << j; } } @@ -13,7 +13,7 @@ pub fn quantize(vec: &[f32; VECTOR_SIZE]) -> [u8; QUANTIZED_VECTOR_SIZE] { pub fn dequantize(vec: &[u8; QUANTIZED_VECTOR_SIZE]) -> [f32; VECTOR_SIZE] { let mut result = [0.0; VECTOR_SIZE]; for i in 0..QUANTIZED_VECTOR_SIZE { - for j in 0..8 { + for j in 0..BITS_PER_ELEMENT { result[i * 8 + j] = if (vec[i] & (1 << j)) > 0 { 1.0 } else { -1.0 }; } } diff --git a/tests/filters.rs b/tests/filters.rs index a3e4729..a7994b1 100644 --- a/tests/filters.rs +++ b/tests/filters.rs @@ -39,6 +39,7 @@ mod filters_tests { indices: vec![1, 2], ids: vec![0, 0], }, + false, ); index.insert( KVPair::new("page_id".to_string(), "page2".to_string()), @@ -46,6 +47,7 @@ mod filters_tests { indices: vec![3], ids: vec![0], }, + false, ); index.insert( KVPair::new("public".to_string(), "1".to_string()), @@ -53,6 +55,7 @@ mod filters_tests { indices: vec![1, 3], ids: vec![0, 0], }, + false, ); index.insert( KVPair::new("permission_id".to_string(), "3iQK2VC4".to_string()), @@ -60,6 +63,7 @@ mod filters_tests { indices: vec![2], ids: vec![0], }, + false, ); index.insert( KVPair::new("permission_id".to_string(), "wzw8zpnQ".to_string()), @@ -67,6 +71,7 @@ mod filters_tests { indices: vec![3], ids: vec![0], }, + false, ); index }