Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 128 additions & 18 deletions src/datahike/migrate.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
[datahike.constants :as c]
[datahike.datom :as d]
[datahike.db :as db]
[clj-cbor.core :as cbor]))
[clj-cbor.core :as cbor]
[clojure.core.async :as async]
[clojure.java.io :as io]
[taoensso.timbre :as timbre]
[clj-cbor.tags.time :as tags.time])
(:import [java.util.concurrent LinkedBlockingQueue]
[java.sql Timestamp]))

(defn export-db
"Export the database in a flat-file of datoms at path.
Expand All @@ -17,23 +23,127 @@
(:attribute-refs? cfg) (remove #(= (d/datom-tx %) c/tx0))
true (map seq)))))

(defn update-max-tx
"Find bigest tx in datoms and update max-tx of db.
Note: the last tx might not be the biggest if the db
has been imported before."
[db datoms]
(assoc db :max-tx (reduce #(max %1 (nth %2 3)) 0 datoms)))

(defn- instance-to-date [v]
(if (instance? java.time.Instant v) (java.util.Date/from v) v))

(defn import-db
"Import a flat-file of datoms at path into your database.
Intended as a temporary solution, pending developments in Wanderung."
[conn path]
(println "Preparing import of" path "in batches of 1000")
(let [datoms (->> (cbor/slurp-all path)
(map #(-> (apply d/datom %) (update :v instance-to-date))))]
(swap! conn update-max-tx datoms)
(print "Importing ")
(api/transact conn (vec datoms))))
(defn- instance-to-double [v]
(if (float? v)
(double v)
v))

(defn process-cbor-file
"Reads a CBOR file from `filename` and calls `process-fn` this allows for
ingesting a large number of datoms without running out of memory."
[filename process-fn stop-fn]
(with-open [in (io/input-stream filename)]
(loop []
(when-let [data (cbor/decode cbor/default-codec in)]
(process-fn data)
(recur))))
(stop-fn))

(defn import-db [conn path & opts]
(let [options (first opts)
filter-schema? (get options :filter-schema? false)
sync? (get options :sync? true)
load-entities? (get options :load-entities? false)

star-time (System/currentTimeMillis)
tx-max (atom 0)
datom-count (atom 0)
txn-count (atom 0)
stop (atom false)
processed (atom false)
update-tx-max (fn [tx] (reset! tx-max (max @tx-max tx)))
q (LinkedBlockingQueue.) ;; thread safe queue for datoms
prepare-datom (fn [item]
;; update as we go so we don't run out of memory
(update-tx-max (nth item 3))
(swap! datom-count inc)
(-> (apply d/datom item)
;; convert Instant to Date (previously experienced bug)
(update :v instance-to-date)
;; convert Float to Double (previously reported bug)
(update :v instance-to-double)))
add-datom (fn [item]
;; skip schema datoms
(if filter-schema?
(when (-> item (nth 1) (str "0000") (subs 0 4) (not= ":db/")) (.put q (prepare-datom item)))
(.put q (prepare-datom item))))
drain-queue (fn []
(let [acc (java.util.ArrayList.)]
;; max required otherwise if previous write is slow too many in transaction
(.drainTo q acc 200000)
(vec acc)))]
;; prevent all the datoms that failed from being logged
(timbre/merge-config! {:min-level [[#{"datahike.writing"} :fatal] [#{"datahike.writer"} :fatal]]})

(async/thread
(process-cbor-file
path
add-datom
(fn []
(reset! processed true))))

(async/thread
(timbre/info "Starting import")
(loop []
(Thread/sleep 100) ; batch writes for improved performance
(when (or @processed (not sync?))
(let [datoms (drain-queue)]
(try
(timbre/debug "loading" (count datoms) "datoms")
(swap! txn-count + (count datoms))

;; in sync mode the max-tx will be as the test expect
;; in async mode the max-tx will be tx-max + 1
(when sync?
(swap! conn assoc :max-tx @tx-max))

(if load-entities?
(api/load-entities conn datoms) ;; load entities is faster for large datasets
(api/transact conn datoms)) ;; transact is slow but preserves max-tx id
(catch Exception e
;; we can't print the message as it contains all the datoms
(timbre/error "Error loading" (count datoms))))))
(when (and (>= @txn-count @datom-count) @processed)
;; stop when we've transacted all datoms
(when-not sync?
(swap! conn assoc :max-tx @tx-max))
(timbre/info "\nImported" @datom-count "datoms in total. \nTime elapsed" (- (System/currentTimeMillis) star-time) "ms")
(reset! stop true))
(when (not @stop) (recur))))

(when sync?
(loop []
(Thread/sleep 100)
(timbre/info "remaining" (- @datom-count @txn-count))
(when (not @stop) (recur))))

;; allow the user to stop or monitor the ingestion thread with this atom
(fn []
{:complete? @stop
:preprocessed? @processed
:total-datoms @datom-count
:remaining (- @datom-count @txn-count)})))

(comment
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alekcz Thank you so much for helping with this! Overall it looks good to me and it is great that this code performs well for you. However, I cannot evaluate this comment block, stub-server is not in scope and the alias d is already loaded. Could you turn this into a test?

;; include
;; [io.replikativ/datahike-jdbc "0.3.49"]
;; [org.xerial/sqlite-jdbc "3.41.2.2"]
(require '[stub-server.migrate :as m] :reload)
(require '[datahike.api :as d])
(require '[clojure.java.io :as io])
(require '[datahike-jdbc.core])
(def dev-db (str "./temp/sqlite/ingest"))
(io/make-parents dev-db)
;; for testing large db's it's best to use sqlite. File store may run out of resources threads or pointers
;; my test 21 seconds with SQLite with 16M datoms
(def cfg {:store {:backend :jdbc :dbtype "sqlite" :dbname dev-db}
:keep-history? true
:allow-unsafe-config true
:store-cache-size 20000
:search-cache-size 20000})
(d/create-database cfg)
(def conn (d/connect cfg))
(def status (m/import-db conn "prod.backup.cbor" {:sync? true :filter-schema? false})))