From 28e212bf1e5016fb4bf8b525b42250e137f7c221 Mon Sep 17 00:00:00 2001 From: Alex Oloo Date: Fri, 14 Mar 2025 23:23:11 +0200 Subject: [PATCH 1/4] introduce memory efficient import-db --- src/datahike/migrate.clj | 136 +++++++++++++++++++++++++++++++++------ 1 file changed, 118 insertions(+), 18 deletions(-) diff --git a/src/datahike/migrate.clj b/src/datahike/migrate.clj index 31bbc4d1f..9e9aeb354 100644 --- a/src/datahike/migrate.clj +++ b/src/datahike/migrate.clj @@ -3,7 +3,13 @@ [datahike.constants :as c] [datahike.datom :as d] [datahike.db :as db] - [clj-cbor.core :as cbor])) + [clj-cbor.core :as cbor] + [clojure.core.async :as async] + [clojure.java.io :as io] + [taoensso.timbre :as timbre] + [clj-cbor.tags.time :as tags.time]) + (:import [java.util.concurrent LinkedBlockingQueue] + [java.sql Timestamp])) (defn export-db "Export the database in a flat-file of datoms at path. @@ -17,23 +23,117 @@ (:attribute-refs? cfg) (remove #(= (d/datom-tx %) c/tx0)) true (map seq))))) -(defn update-max-tx - "Find bigest tx in datoms and update max-tx of db. - Note: the last tx might not be the biggest if the db - has been imported before." - [db datoms] - (assoc db :max-tx (reduce #(max %1 (nth %2 3)) 0 datoms))) - (defn- instance-to-date [v] (if (instance? java.time.Instant v) (java.util.Date/from v) v)) -(defn import-db - "Import a flat-file of datoms at path into your database. - Intended as a temporary solution, pending developments in Wanderung." - [conn path] - (println "Preparing import of" path "in batches of 1000") - (let [datoms (->> (cbor/slurp-all path) - (map #(-> (apply d/datom %) (update :v instance-to-date))))] - (swap! conn update-max-tx datoms) - (print "Importing ") - (api/transact conn (vec datoms)))) +(defn- instance-to-double [v] + (if (float? v) + (double v) + v)) + +(defn process-cbor-file + "Reads a CBOR file from `filename` and calls `process-fn` this allows for + ingesting a large number of datoms without running out of memory." + [filename process-fn stop-fn] + (let [codec (cbor/cbor-codec + {:read-handlers {0 :x} + :write-handlers tags.time/date-read-handlers})] + (with-open [in (io/input-stream filename)] + (loop [] + (when-let [data (cbor/decode cbor/default-codec in)] + (process-fn data) + (recur)))) + (stop-fn))) + +(defn import-db [conn path & opts] + (let [star-time (System/currentTimeMillis) + filter-schema? (get opts :filter-schema? false) + sync? (get opts :sync? true) + tx-max (atom 0) + datom-count (atom 0) + txn-count (atom 0) + stop (atom false) + processed (atom false) + update-tx-max (fn [tx] (reset! tx-max (max @tx-max tx))) + q (LinkedBlockingQueue.) ;; thread safe queue for datoms + prepare-datom (fn [item] + ;; update as we go so we don't run out of memory + (update-tx-max (nth item 3)) + (swap! datom-count inc) + (-> (apply d/datom item) + ;; convert Instant to Date (previously experienced bug) + (update :v instance-to-date) + ;; convert Float to Double (previously reported bug) + (update :v instance-to-double))) + add-datom (fn [item] + (let [datom (prepare-datom item)] + ;; skip schema datoms + (if filter-schema? + (when (-> item (nth 1) (str "0000") (subs 0 4) (not= ":db/")) (.put q datom)) + (.put q datom)))) + drain-queue (fn [] + (let [acc (java.util.ArrayList.)] + ;; max required otherwise if previous write is slow too many in transaction + (.drainTo q acc 200000) + (vec acc)))] + ;; prevent all the datoms that failed from being logged + (timbre/merge-config! {:min-level [[#{"datahike.writing"} :fatal] [#{"datahike.writer"} :fatal]]}) + + (async/thread + (timbre/info "Starting import") + (loop [] + (Thread/sleep 100) ; batch writes for improved performance + (let [datoms (drain-queue)] + (try + (timbre/debug "loading" (count datoms) "datoms") + (swap! txn-count + (count datoms)) + (api/load-entities conn datoms) + (catch Exception e + ;; we can't print the message as it contains all the datoms + (timbre/error "Error loading" (count datoms))))) + (when (and (>= @txn-count @datom-count) @processed) + ;; stop when we've transacted all datoms + (timbre/info "\nImported" @datom-count "datoms in total. \nTime elapsed" (- (System/currentTimeMillis) star-time) "ms") + (reset! stop true)) + (when (not @stop) (recur)))) + + (async/thread + (process-cbor-file + path + add-datom + (fn [] + (reset! processed true)))) + + (when sync? + (loop [] + (Thread/sleep 100) + (timbre/info "remaining" (- @datom-count @txn-count)) + (when (not @stop) (recur)))) + + ;; allow the user to stop or monitor the ingestion thread with this atom + (fn [] + {:complete? @stop + :preprocessed? @processed + :total-datoms @datom-count + :remaining (- @datom-count @txn-count)}))) + +(comment + ;; include + ;; [io.replikativ/datahike-jdbc "0.3.49"] + ;; [org.xerial/sqlite-jdbc "3.41.2.2"] + (require '[stub-server.migrate :as m] :reload) + (require '[datahike.api :as d]) + (require '[clojure.java.io :as io]) + (require '[datahike-jdbc.core]) + (def dev-db (str "./temp/sqlite/ingest")) + (io/make-parents dev-db) + ;; for testing large db's it's best to use sqlite. File store may run out of resources threads or pointers + ;; my test 21 seconds with SQLite with 16M datoms + (def cfg {:store {:backend :jdbc :dbtype "sqlite" :dbname dev-db} + :keep-history? true + :allow-unsafe-config true + :store-cache-size 20000 + :search-cache-size 20000}) + (d/create-database cfg) + (def conn (d/connect cfg)) + (def status (m/import-db conn "prod.backup.cbor" {:sync? true :filter-schema? false}))) \ No newline at end of file From bdbfb36ee962f9e196f27814992a11b96ef9238c Mon Sep 17 00:00:00 2001 From: Alex Oloo Date: Fri, 14 Mar 2025 23:35:45 +0200 Subject: [PATCH 2/4] maintain backwards compatibility --- src/datahike/migrate.clj | 54 ++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/src/datahike/migrate.clj b/src/datahike/migrate.clj index 9e9aeb354..62bda63bf 100644 --- a/src/datahike/migrate.clj +++ b/src/datahike/migrate.clj @@ -46,9 +46,11 @@ (stop-fn))) (defn import-db [conn path & opts] - (let [star-time (System/currentTimeMillis) - filter-schema? (get opts :filter-schema? false) + (let [filter-schema? (get opts :filter-schema? false) sync? (get opts :sync? true) + load-entities? (get opts :load-entities? false) + + star-time (System/currentTimeMillis) tx-max (atom 0) datom-count (atom 0) txn-count (atom 0) @@ -66,11 +68,10 @@ ;; convert Float to Double (previously reported bug) (update :v instance-to-double))) add-datom (fn [item] - (let [datom (prepare-datom item)] - ;; skip schema datoms + ;; skip schema datoms (if filter-schema? - (when (-> item (nth 1) (str "0000") (subs 0 4) (not= ":db/")) (.put q datom)) - (.put q datom)))) + (when (-> item (nth 1) (str "0000") (subs 0 4) (not= ":db/")) (.put q (prepare-datom item))) + (.put q (prepare-datom item)))) drain-queue (fn [] (let [acc (java.util.ArrayList.)] ;; max required otherwise if previous write is slow too many in transaction @@ -79,31 +80,42 @@ ;; prevent all the datoms that failed from being logged (timbre/merge-config! {:min-level [[#{"datahike.writing"} :fatal] [#{"datahike.writer"} :fatal]]}) + (async/thread + (process-cbor-file + path + add-datom + (fn [] + (reset! processed true)))) + (async/thread (timbre/info "Starting import") (loop [] (Thread/sleep 100) ; batch writes for improved performance - (let [datoms (drain-queue)] - (try - (timbre/debug "loading" (count datoms) "datoms") - (swap! txn-count + (count datoms)) - (api/load-entities conn datoms) - (catch Exception e - ;; we can't print the message as it contains all the datoms - (timbre/error "Error loading" (count datoms))))) + (when (or @processed (not sync?)) + (let [datoms (drain-queue)] + (try + (timbre/debug "loading" (count datoms) "datoms") + (swap! txn-count + (count datoms)) + + ;; in sync mode the max-tx will be as the test expect + ;; in async mode the max-tx will be tx-max + 1 + (when sync? + (swap! conn assoc :max-tx @tx-max)) + + (if load-entities? + (api/load-entities conn datoms) ;; load entities is faster for large datasets + (api/transact conn datoms)) ;; transact is slow but preserves max-tx id + (catch Exception e + ;; we can't print the message as it contains all the datoms + (timbre/error "Error loading" (count datoms)))))) (when (and (>= @txn-count @datom-count) @processed) ;; stop when we've transacted all datoms + (when-not sync? + (swap! conn assoc :max-tx @tx-max)) (timbre/info "\nImported" @datom-count "datoms in total. \nTime elapsed" (- (System/currentTimeMillis) star-time) "ms") (reset! stop true)) (when (not @stop) (recur)))) - (async/thread - (process-cbor-file - path - add-datom - (fn [] - (reset! processed true)))) - (when sync? (loop [] (Thread/sleep 100) From 99855b2da3bb260e1ec0fc9f1a23c3aeb8e89e1a Mon Sep 17 00:00:00 2001 From: Alex Oloo Date: Sun, 16 Mar 2025 16:44:57 +0200 Subject: [PATCH 3/4] remove unused codec --- src/datahike/migrate.clj | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/src/datahike/migrate.clj b/src/datahike/migrate.clj index 62bda63bf..e17a1803c 100644 --- a/src/datahike/migrate.clj +++ b/src/datahike/migrate.clj @@ -35,21 +35,18 @@ "Reads a CBOR file from `filename` and calls `process-fn` this allows for ingesting a large number of datoms without running out of memory." [filename process-fn stop-fn] - (let [codec (cbor/cbor-codec - {:read-handlers {0 :x} - :write-handlers tags.time/date-read-handlers})] - (with-open [in (io/input-stream filename)] - (loop [] - (when-let [data (cbor/decode cbor/default-codec in)] - (process-fn data) - (recur)))) - (stop-fn))) + (with-open [in (io/input-stream filename)] + (loop [] + (when-let [data (cbor/decode cbor/default-codec in)] + (process-fn data) + (recur)))) + (stop-fn)) (defn import-db [conn path & opts] (let [filter-schema? (get opts :filter-schema? false) sync? (get opts :sync? true) load-entities? (get opts :load-entities? false) - + star-time (System/currentTimeMillis) tx-max (atom 0) datom-count (atom 0) @@ -69,9 +66,9 @@ (update :v instance-to-double))) add-datom (fn [item] ;; skip schema datoms - (if filter-schema? - (when (-> item (nth 1) (str "0000") (subs 0 4) (not= ":db/")) (.put q (prepare-datom item))) - (.put q (prepare-datom item)))) + (if filter-schema? + (when (-> item (nth 1) (str "0000") (subs 0 4) (not= ":db/")) (.put q (prepare-datom item))) + (.put q (prepare-datom item)))) drain-queue (fn [] (let [acc (java.util.ArrayList.)] ;; max required otherwise if previous write is slow too many in transaction @@ -96,12 +93,12 @@ (try (timbre/debug "loading" (count datoms) "datoms") (swap! txn-count + (count datoms)) - + ;; in sync mode the max-tx will be as the test expect ;; in async mode the max-tx will be tx-max + 1 (when sync? (swap! conn assoc :max-tx @tx-max)) - + (if load-entities? (api/load-entities conn datoms) ;; load entities is faster for large datasets (api/transact conn datoms)) ;; transact is slow but preserves max-tx id From b60cb2e278371c01612c1d958de2f4ce97a6735f Mon Sep 17 00:00:00 2001 From: Alex Oloo Date: Sun, 16 Mar 2025 20:56:25 +0200 Subject: [PATCH 4/4] extract opts correctly --- src/datahike/migrate.clj | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/datahike/migrate.clj b/src/datahike/migrate.clj index e17a1803c..6993c2897 100644 --- a/src/datahike/migrate.clj +++ b/src/datahike/migrate.clj @@ -43,9 +43,10 @@ (stop-fn)) (defn import-db [conn path & opts] - (let [filter-schema? (get opts :filter-schema? false) - sync? (get opts :sync? true) - load-entities? (get opts :load-entities? false) + (let [options (first opts) + filter-schema? (get options :filter-schema? false) + sync? (get options :sync? true) + load-entities? (get options :load-entities? false) star-time (System/currentTimeMillis) tx-max (atom 0)