diff --git a/CHANGELOG.md b/CHANGELOG.md index fc79f725f3..9715f708fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,12 @@ and this project adheres to ### Fixed +- Fix infinite loading when reopening workflow in collaborative editor due to + delta updates not being merged with persisted state + [#4164](https://github.com/OpenFn/lightning/issues/4164) +- Fix incorrect field access and version type in checkpoint creation that would + crash after 500 document updates + [#4176](https://github.com/OpenFn/lightning/issues/4176) - Fix issue where users were not able to go to the latest versin of a workflow [#4149](https://github.com/OpenFn/lightning/issues/4149) - Fix AI Assistant disclaimer not persisting after acceptance diff --git a/lib/lightning/collaboration/document_state.ex b/lib/lightning/collaboration/document_state.ex index 728847052c..c454796495 100644 --- a/lib/lightning/collaboration/document_state.ex +++ b/lib/lightning/collaboration/document_state.ex @@ -10,6 +10,19 @@ defmodule Lightning.Collaboration.DocumentState do use Ecto.Schema import Ecto.Changeset + import Ecto.Query + + alias Lightning.Repo + + @type t :: %__MODULE__{ + id: integer() | nil, + document_name: String.t() | nil, + state_data: binary() | nil, + state_vector: binary() | nil, + version: :update | :checkpoint | :state_vector | nil, + inserted_at: DateTime.t() | nil, + updated_at: DateTime.t() | nil + } schema "collaboration_document_states" do field :document_name, :string @@ -30,4 +43,93 @@ defmodule Lightning.Collaboration.DocumentState do ]) |> validate_required([:document_name, :state_data, :version]) end + + @doc """ + Retrieves the latest checkpoint and all updates since that checkpoint + for a given document. + + Returns `{:ok, checkpoint, updates}` where checkpoint may be nil, + or `{:error, :not_found}` if no persisted state exists. + """ + @spec get_checkpoint_and_updates(String.t()) :: + {:ok, __MODULE__.t() | nil, [__MODULE__.t()]} | {:error, :not_found} + def get_checkpoint_and_updates(doc_name) do + checkpoint = get_latest_checkpoint(doc_name) + + checkpoint_time = + if checkpoint, do: checkpoint.inserted_at, else: ~U[1970-01-01 00:00:00Z] + + updates = get_updates_since(doc_name, checkpoint_time) + + if checkpoint || length(updates) > 0 do + {:ok, checkpoint, updates} + else + {:error, :not_found} + end + end + + @doc """ + Retrieves the latest checkpoint for a document, or nil if none exists. + """ + @spec get_latest_checkpoint(String.t()) :: __MODULE__.t() | nil + def get_latest_checkpoint(doc_name) do + Repo.one( + from d in __MODULE__, + where: d.document_name == ^doc_name and d.version == :checkpoint, + order_by: [desc: d.inserted_at], + limit: 1 + ) + end + + @doc """ + Retrieves all updates for a document inserted after the given timestamp, + ordered chronologically (oldest first). + """ + @spec get_updates_since(String.t(), DateTime.t()) :: [__MODULE__.t()] + def get_updates_since(doc_name, since) do + Repo.all( + from d in __MODULE__, + where: + d.document_name == ^doc_name and + d.version == :update and + d.inserted_at > ^since, + order_by: [asc: d.inserted_at] + ) + end + + @doc """ + Applies persisted state (checkpoint + updates) to a Yex document. + + Applies the checkpoint first (if present), then all updates in + chronological order. + """ + @spec apply_to_doc(Yex.Doc.t(), __MODULE__.t() | nil, [__MODULE__.t()]) :: :ok + def apply_to_doc(doc, checkpoint, updates) do + if checkpoint do + Yex.apply_update(doc, checkpoint.state_data) + end + + Enum.each(updates, fn update -> + Yex.apply_update(doc, update.state_data) + end) + + :ok + end + + @doc """ + Loads all persisted state for a document and applies it to a Yex document. + + Convenience function that combines `get_checkpoint_and_updates/1` and + `apply_to_doc/3`. + """ + @spec load_into_doc(Yex.Doc.t(), String.t()) :: :ok + def load_into_doc(doc, doc_name) do + checkpoint = get_latest_checkpoint(doc_name) + + checkpoint_time = + if checkpoint, do: checkpoint.inserted_at, else: ~U[1970-01-01 00:00:00Z] + + updates = get_updates_since(doc_name, checkpoint_time) + apply_to_doc(doc, checkpoint, updates) + end end diff --git a/lib/lightning/collaboration/persistence.ex b/lib/lightning/collaboration/persistence.ex index 74f48b8d9e..b4facbb737 100644 --- a/lib/lightning/collaboration/persistence.ex +++ b/lib/lightning/collaboration/persistence.ex @@ -11,7 +11,6 @@ defmodule Lightning.Collaboration.Persistence do alias Lightning.Collaboration.DocumentState alias Lightning.Collaboration.PersistenceWriter alias Lightning.Collaboration.Session - alias Lightning.Repo require Logger @@ -27,7 +26,7 @@ defmodule Lightning.Collaboration.Persistence do """ end - case load_document_records(doc_name) do + case DocumentState.get_checkpoint_and_updates(doc_name) do {:ok, checkpoint, updates} -> apply_persisted_state(doc, doc_name, checkpoint, updates) reconcile_or_reset(doc, doc_name, workflow) @@ -46,7 +45,6 @@ defmodule Lightning.Collaboration.Persistence do @impl true def update_v1(state, update, doc_name, _doc) do - # Send to PersistenceWriter via state case PersistenceWriter.add_update(doc_name, update) do :ok -> state @@ -82,51 +80,12 @@ defmodule Lightning.Collaboration.Persistence do end # Private functions - defp load_document_records(doc_name) do - import Ecto.Query - - # Get latest checkpoint - checkpoint = - Repo.one( - from d in DocumentState, - where: d.document_name == ^doc_name and d.version == :checkpoint, - order_by: [desc: d.inserted_at], - limit: 1 - ) - - checkpoint_time = - if checkpoint, do: checkpoint.inserted_at, else: ~U[1970-01-01 00:00:00Z] - - # Get updates after checkpoint - updates = - Repo.all( - from d in DocumentState, - where: - d.document_name == ^doc_name and - d.version == :update and - d.inserted_at > ^checkpoint_time, - order_by: [asc: d.inserted_at] - ) - - if checkpoint || length(updates) > 0 do - {:ok, checkpoint, updates} - else - {:error, :not_found} - end - end - defp apply_persisted_state(doc, doc_name, checkpoint, updates) do - # Apply checkpoint first if exists if checkpoint do Logger.info("Applying checkpoint to document. document=#{doc_name}") - Yex.apply_update(doc, checkpoint.state_data) end - # Then apply updates in chronological order - Enum.each(updates, fn update -> - Yex.apply_update(doc, update.state_data) - end) - + DocumentState.apply_to_doc(doc, checkpoint, updates) Logger.debug("Loaded #{length(updates)} updates. document=#{doc_name}") end @@ -157,13 +116,15 @@ defmodule Lightning.Collaboration.Persistence do case Yex.Map.fetch(workflow_map, "lock_version") do {:ok, version} when is_float(version) -> trunc(version) {:ok, version} when is_integer(version) -> version + {:ok, nil} -> nil :error -> nil end end - defp stale?(persisted_version, current_version) do - persisted_version != current_version and not is_nil(persisted_version) - end + defp stale?(nil, current_version), do: not is_nil(current_version) + + defp stale?(persisted_version, current_version), + do: persisted_version != current_version defp clear_and_reset_workflow(doc, workflow) do # Same pattern as Session.clear_and_reset_doc diff --git a/lib/lightning/collaboration/persistence_writer.ex b/lib/lightning/collaboration/persistence_writer.ex index 0a27a48e23..40af51b8e7 100644 --- a/lib/lightning/collaboration/persistence_writer.ex +++ b/lib/lightning/collaboration/persistence_writer.ex @@ -333,8 +333,10 @@ defmodule Lightning.Collaboration.PersistenceWriter do "Saving #{length(state.pending_updates)} pending updates for document: #{state.document_name}" ) - # Merge multiple updates if we have more than one - merged_update = merge_updates(state.pending_updates) + # Merge multiple updates if we have more than one. + # Pass document_name so merge_updates can load existing persisted state + # before applying deltas (Yjs deltas require the base state to merge correctly). + merged_update = merge_updates(state.pending_updates, state.document_name) document_state = %DocumentState{ document_name: state.document_name, @@ -343,7 +345,7 @@ defmodule Lightning.Collaboration.PersistenceWriter do } case Repo.insert(document_state) do - {:ok, _} -> + {:ok, _inserted} -> {:ok, length(state.pending_updates)} {:error, reason} -> @@ -355,13 +357,19 @@ defmodule Lightning.Collaboration.PersistenceWriter do {:error, exception} end - defp merge_updates([update]), do: update + defp merge_updates([update], _document_name), do: update - defp merge_updates(updates) when length(updates) > 1 do - # Create a temporary document and apply all updates in reverse order - # (since we store them in reverse chronological order) + defp merge_updates(updates, document_name) when length(updates) > 1 do + # Load the current persisted state first, then apply the new deltas. + # This is necessary because Yjs delta updates only contain changes, + # not the full state. Without the base state, applying deltas to an + # empty document produces an empty result. temp_doc = Yex.Doc.new() + # Load existing persisted state (checkpoint + any previously saved updates) + DocumentState.load_into_doc(temp_doc, document_name) + + # Apply new updates in chronological order (oldest first) updates |> Enum.reverse() |> Enum.each(fn update -> @@ -408,12 +416,12 @@ defmodule Lightning.Collaboration.PersistenceWriter do # Apply checkpoint first if it exists if latest_checkpoint do - Yex.apply_update(temp_doc, latest_checkpoint.data) + Yex.apply_update(temp_doc, latest_checkpoint.state_data) end # Apply all updates in order Enum.each(updates, fn update_record -> - Yex.apply_update(temp_doc, update_record.data) + Yex.apply_update(temp_doc, update_record.state_data) end) # Create new checkpoint @@ -421,7 +429,7 @@ defmodule Lightning.Collaboration.PersistenceWriter do checkpoint = %DocumentState{ document_name: document_name, - version: "checkpoint", + version: :checkpoint, state_data: checkpoint_data } diff --git a/test/lightning/collaboration/document_supervisor_test.exs b/test/lightning/collaboration/document_supervisor_test.exs index 5c1f4757a2..1b5e881a95 100644 --- a/test/lightning/collaboration/document_supervisor_test.exs +++ b/test/lightning/collaboration/document_supervisor_test.exs @@ -1,7 +1,9 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do use Lightning.DataCase, async: false + alias Lightning.Collaboration.DocumentState alias Lightning.Collaboration.DocumentSupervisor + alias Lightning.Collaboration.PersistenceWriter alias Lightning.Collaboration.Registry import Eventually @@ -812,4 +814,165 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do verify_cleanup(document_name, workflow_id) end end + + describe "9. Checkpoint Creation" do + test "creates checkpoint from persisted updates", %{workflow: workflow} do + document_name = "workflow:#{workflow.id}" + + # Create some initial document state with Y.Doc data + doc = Yex.Doc.new() + workflow_map = Yex.Doc.get_map(doc, "workflow") + + Yex.Doc.transaction(doc, "setup", fn -> + Yex.Map.set(workflow_map, "id", workflow.id) + Yex.Map.set(workflow_map, "name", "Test Workflow") + Yex.Map.set(workflow_map, "lock_version", 1) + end) + + {:ok, update_data} = Yex.encode_state_as_update(doc) + + # Insert as an update (not checkpoint) + {:ok, _} = + Repo.insert(%DocumentState{ + document_name: document_name, + state_data: update_data, + version: :update + }) + + # Add a second update with additional data + Yex.Doc.transaction(doc, "add_concurrency", fn -> + Yex.Map.set(workflow_map, "concurrency", 10) + end) + + {:ok, update_data2} = Yex.encode_state_as_update(doc) + + {:ok, _} = + Repo.insert(%DocumentState{ + document_name: document_name, + state_data: update_data2, + version: :update + }) + + # Start PersistenceWriter + {:ok, persistence_writer} = + PersistenceWriter.start_link( + document_name: document_name, + name: Registry.via({:persistence_writer, document_name}) + ) + + # Trigger checkpoint creation by sending the message directly + send(persistence_writer, :create_checkpoint) + + # Wait for the checkpoint to be created + Process.sleep(100) + + # Verify checkpoint was created + checkpoint = + Repo.one( + from d in DocumentState, + where: + d.document_name == ^document_name and d.version == ^"checkpoint", + order_by: [desc: d.inserted_at], + limit: 1 + ) + + assert checkpoint != nil, "Checkpoint should have been created" + + # Verify checkpoint contains valid Y.Doc data by loading it + checkpoint_doc = Yex.Doc.new() + :ok = Yex.apply_update(checkpoint_doc, checkpoint.state_data) + + checkpoint_workflow_map = Yex.Doc.get_map(checkpoint_doc, "workflow") + + # Verify all data is present in checkpoint + assert Yex.Map.fetch!(checkpoint_workflow_map, "id") == workflow.id + assert Yex.Map.fetch!(checkpoint_workflow_map, "name") == "Test Workflow" + assert Yex.Map.fetch!(checkpoint_workflow_map, "lock_version") == 1 + assert Yex.Map.fetch!(checkpoint_workflow_map, "concurrency") == 10 + + # Clean up + GenServer.stop(persistence_writer, :normal) + end + + test "creates checkpoint merging existing checkpoint with updates", %{ + workflow: workflow + } do + document_name = "workflow:#{workflow.id}" + + # Create initial state + doc = Yex.Doc.new() + workflow_map = Yex.Doc.get_map(doc, "workflow") + + Yex.Doc.transaction(doc, "initial", fn -> + Yex.Map.set(workflow_map, "id", workflow.id) + Yex.Map.set(workflow_map, "name", "Initial Name") + Yex.Map.set(workflow_map, "lock_version", 1) + end) + + {:ok, checkpoint_data} = Yex.encode_state_as_update(doc) + + # Insert as existing checkpoint + {:ok, _} = + Repo.insert(%DocumentState{ + document_name: document_name, + state_data: checkpoint_data, + version: :checkpoint + }) + + # Create a delta update that modifies data + Yex.Doc.transaction(doc, "modify", fn -> + Yex.Map.set(workflow_map, "name", "Modified Name") + Yex.Map.set(workflow_map, "lock_version", 2) + end) + + {:ok, update_data} = Yex.encode_state_as_update(doc) + + # Wait a bit so inserted_at is different + Process.sleep(10) + + {:ok, _} = + Repo.insert(%DocumentState{ + document_name: document_name, + state_data: update_data, + version: :update + }) + + # Start PersistenceWriter and trigger checkpoint + {:ok, persistence_writer} = + PersistenceWriter.start_link( + document_name: document_name, + name: Registry.via({:persistence_writer, document_name}) + ) + + send(persistence_writer, :create_checkpoint) + Process.sleep(100) + + # Get the new checkpoint (should be the most recent one) + checkpoints = + Repo.all( + from d in DocumentState, + where: + d.document_name == ^document_name and d.version == ^"checkpoint", + order_by: [desc: d.inserted_at] + ) + + # Should have 2 checkpoints now + assert length(checkpoints) == 2 + + # Latest checkpoint should have merged data + [latest_checkpoint | _] = checkpoints + new_doc = Yex.Doc.new() + :ok = Yex.apply_update(new_doc, latest_checkpoint.state_data) + + new_workflow_map = Yex.Doc.get_map(new_doc, "workflow") + + # Verify merged data + assert Yex.Map.fetch!(new_workflow_map, "id") == workflow.id + assert Yex.Map.fetch!(new_workflow_map, "name") == "Modified Name" + assert Yex.Map.fetch!(new_workflow_map, "lock_version") == 2 + + # Clean up + GenServer.stop(persistence_writer, :normal) + end + end end diff --git a/test/lightning/collaboration/session_test.exs b/test/lightning/collaboration/session_test.exs index 0128ba8bb9..82d469de5e 100644 --- a/test/lightning/collaboration/session_test.exs +++ b/test/lightning/collaboration/session_test.exs @@ -1528,5 +1528,164 @@ defmodule Lightning.SessionTest do Session.stop(session2) end + + test "handles persisted Y.Doc with nil lock_version when DB has real version", + %{ + user: user + } do + # This tests the bug fix for issue #4164 + # When a workflow is opened before first save, Y.Doc gets lock_version: nil + # If that state is persisted and the workflow is later saved (getting a real lock_version), + # loading the persisted state would crash because extract_lock_version didn't handle {:ok, nil} + + workflow = insert(:simple_workflow) + doc_name = "workflow:#{workflow.id}" + + # Manually create a persisted document state with lock_version: nil + # This simulates a Y.Doc that was persisted before the workflow was ever saved + doc = Yex.Doc.new() + workflow_map = Yex.Doc.get_map(doc, "workflow") + + Yex.Doc.transaction(doc, "setup_nil_lock_version", fn -> + Yex.Map.set(workflow_map, "id", workflow.id) + Yex.Map.set(workflow_map, "name", "Test Workflow") + Yex.Map.set(workflow_map, "lock_version", nil) + end) + + {:ok, update_data} = Yex.encode_state_as_update(doc) + + Repo.insert!(%DocumentState{ + document_name: doc_name, + state_data: update_data, + version: :update + }) + + # Now start a session - this should NOT crash + # The persistence layer should handle the nil lock_version and reset from DB + {:ok, _doc_supervisor} = + Lightning.Collaborate.start_document( + workflow, + doc_name + ) + + {:ok, session} = + Session.start_link( + user: user, + workflow: workflow, + parent_pid: self(), + document_name: doc_name + ) + + # Verify the session started and lock_version was reconciled from DB + shared_doc = Session.get_doc(session) + workflow_map2 = Yex.Doc.get_map(shared_doc, "workflow") + reconciled_lock_version = Yex.Map.fetch!(workflow_map2, "lock_version") + + # lock_version should now match the database value + assert reconciled_lock_version == workflow.lock_version, + "Expected lock_version #{workflow.lock_version} but got #{reconciled_lock_version}" + + Session.stop(session) + end + + test "merges delta updates with persisted state across save batches", %{ + user: user + } do + # This tests the fix for the merge_updates bug where delta updates + # saved in subsequent batches were applied to an empty doc instead of + # the current persisted state, resulting in data loss. + # + # Scenario: + # 1. First batch persists base state with workflow data and lock_version + # 2. Another change generates a delta update (adding concurrency field) + # 3. Second batch should merge the delta with existing persisted state + # + # Note: We verify concurrency is preserved, not name, because + # reconcile_workflow_metadata updates name from DB after loading. + + workflow = insert(:simple_workflow) + doc_name = "workflow:#{workflow.id}" + + # First, create initial persisted state (simulating first batch) + # Set lock_version to match the workflow's DB lock_version to avoid reset + initial_doc = Yex.Doc.new() + workflow_map = Yex.Doc.get_map(initial_doc, "workflow") + + Yex.Doc.transaction(initial_doc, "initial_state", fn -> + Yex.Map.set(workflow_map, "id", workflow.id) + Yex.Map.set(workflow_map, "name", workflow.name) + Yex.Map.set(workflow_map, "lock_version", workflow.lock_version) + end) + + {:ok, initial_update} = Yex.encode_state_as_update(initial_doc) + + # Persist the initial state + Repo.insert!(%DocumentState{ + document_name: doc_name, + state_data: initial_update, + version: :update + }) + + # Now create a delta update that adds a new field + # (simulating what happens when user edits concurrency in the workflow) + delta_doc = Yex.Doc.new() + delta_workflow_map = Yex.Doc.get_map(delta_doc, "workflow") + + # Apply initial state first, then make delta change + Yex.apply_update(delta_doc, initial_update) + + Yex.Doc.transaction(delta_doc, "add_concurrency", fn -> + Yex.Map.set(delta_workflow_map, "concurrency", 5) + end) + + # Get the delta (diff since initial state) + {:ok, state_vector} = Yex.encode_state_vector(initial_doc) + {:ok, delta_update} = Yex.encode_state_as_update(delta_doc, state_vector) + + # The delta should be small (just the concurrency change) + assert byte_size(delta_update) < byte_size(initial_update) + + # Persist the delta as a second update + Repo.insert!(%DocumentState{ + document_name: doc_name, + state_data: delta_update, + version: :update + }) + + # Now start a session - this will load and reconstruct the full state + {:ok, _doc_supervisor} = + Lightning.Collaborate.start_document( + workflow, + doc_name + ) + + {:ok, session} = + Session.start_link( + user: user, + workflow: workflow, + parent_pid: self(), + document_name: doc_name + ) + + # Verify the session loaded the full state correctly + shared_doc = Session.get_doc(session) + loaded_workflow_map = Yex.Doc.get_map(shared_doc, "workflow") + + # The key assertion: concurrency from the delta update should be preserved. + # This verifies the fix - before the fix, delta updates applied to an empty + # doc would result in data loss (concurrency would be missing). + loaded_concurrency = Yex.Map.fetch!(loaded_workflow_map, "concurrency") + assert loaded_concurrency == 5 + + # Name should match DB (reconciliation updates it) + loaded_name = Yex.Map.fetch!(loaded_workflow_map, "name") + assert loaded_name == workflow.name + + # lock_version should match DB (reconciliation updates it) + loaded_lock_version = Yex.Map.fetch!(loaded_workflow_map, "lock_version") + assert loaded_lock_version == workflow.lock_version + + Session.stop(session) + end end end