diff --git a/CHANGELOG.md b/CHANGELOG.md index b381aef44b..79b4c14e66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,9 @@ and this project adheres to - Fix infinite loading when reopening workflow in collaborative editor due to delta updates not being merged with persisted state [#4164](https://github.com/OpenFn/lightning/issues/4164) +- Fix incorrect field access and version type in checkpoint creation that would + crash after 500 document updates + [#4176](https://github.com/OpenFn/lightning/issues/4176) ## [2.15.0-pre4] - 2025-12-08 diff --git a/lib/lightning/collaboration/persistence_writer.ex b/lib/lightning/collaboration/persistence_writer.ex index fb462caf65..40af51b8e7 100644 --- a/lib/lightning/collaboration/persistence_writer.ex +++ b/lib/lightning/collaboration/persistence_writer.ex @@ -416,12 +416,12 @@ defmodule Lightning.Collaboration.PersistenceWriter do # Apply checkpoint first if it exists if latest_checkpoint do - Yex.apply_update(temp_doc, latest_checkpoint.data) + Yex.apply_update(temp_doc, latest_checkpoint.state_data) end # Apply all updates in order Enum.each(updates, fn update_record -> - Yex.apply_update(temp_doc, update_record.data) + Yex.apply_update(temp_doc, update_record.state_data) end) # Create new checkpoint @@ -429,7 +429,7 @@ defmodule Lightning.Collaboration.PersistenceWriter do checkpoint = %DocumentState{ document_name: document_name, - version: "checkpoint", + version: :checkpoint, state_data: checkpoint_data } diff --git a/test/lightning/collaboration/document_supervisor_test.exs b/test/lightning/collaboration/document_supervisor_test.exs index 5c1f4757a2..1b5e881a95 100644 --- a/test/lightning/collaboration/document_supervisor_test.exs +++ b/test/lightning/collaboration/document_supervisor_test.exs @@ -1,7 +1,9 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do use Lightning.DataCase, async: false + alias Lightning.Collaboration.DocumentState alias Lightning.Collaboration.DocumentSupervisor + alias Lightning.Collaboration.PersistenceWriter alias Lightning.Collaboration.Registry import Eventually @@ -812,4 +814,165 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do verify_cleanup(document_name, workflow_id) end end + + describe "9. Checkpoint Creation" do + test "creates checkpoint from persisted updates", %{workflow: workflow} do + document_name = "workflow:#{workflow.id}" + + # Create some initial document state with Y.Doc data + doc = Yex.Doc.new() + workflow_map = Yex.Doc.get_map(doc, "workflow") + + Yex.Doc.transaction(doc, "setup", fn -> + Yex.Map.set(workflow_map, "id", workflow.id) + Yex.Map.set(workflow_map, "name", "Test Workflow") + Yex.Map.set(workflow_map, "lock_version", 1) + end) + + {:ok, update_data} = Yex.encode_state_as_update(doc) + + # Insert as an update (not checkpoint) + {:ok, _} = + Repo.insert(%DocumentState{ + document_name: document_name, + state_data: update_data, + version: :update + }) + + # Add a second update with additional data + Yex.Doc.transaction(doc, "add_concurrency", fn -> + Yex.Map.set(workflow_map, "concurrency", 10) + end) + + {:ok, update_data2} = Yex.encode_state_as_update(doc) + + {:ok, _} = + Repo.insert(%DocumentState{ + document_name: document_name, + state_data: update_data2, + version: :update + }) + + # Start PersistenceWriter + {:ok, persistence_writer} = + PersistenceWriter.start_link( + document_name: document_name, + name: Registry.via({:persistence_writer, document_name}) + ) + + # Trigger checkpoint creation by sending the message directly + send(persistence_writer, :create_checkpoint) + + # Wait for the checkpoint to be created + Process.sleep(100) + + # Verify checkpoint was created + checkpoint = + Repo.one( + from d in DocumentState, + where: + d.document_name == ^document_name and d.version == ^"checkpoint", + order_by: [desc: d.inserted_at], + limit: 1 + ) + + assert checkpoint != nil, "Checkpoint should have been created" + + # Verify checkpoint contains valid Y.Doc data by loading it + checkpoint_doc = Yex.Doc.new() + :ok = Yex.apply_update(checkpoint_doc, checkpoint.state_data) + + checkpoint_workflow_map = Yex.Doc.get_map(checkpoint_doc, "workflow") + + # Verify all data is present in checkpoint + assert Yex.Map.fetch!(checkpoint_workflow_map, "id") == workflow.id + assert Yex.Map.fetch!(checkpoint_workflow_map, "name") == "Test Workflow" + assert Yex.Map.fetch!(checkpoint_workflow_map, "lock_version") == 1 + assert Yex.Map.fetch!(checkpoint_workflow_map, "concurrency") == 10 + + # Clean up + GenServer.stop(persistence_writer, :normal) + end + + test "creates checkpoint merging existing checkpoint with updates", %{ + workflow: workflow + } do + document_name = "workflow:#{workflow.id}" + + # Create initial state + doc = Yex.Doc.new() + workflow_map = Yex.Doc.get_map(doc, "workflow") + + Yex.Doc.transaction(doc, "initial", fn -> + Yex.Map.set(workflow_map, "id", workflow.id) + Yex.Map.set(workflow_map, "name", "Initial Name") + Yex.Map.set(workflow_map, "lock_version", 1) + end) + + {:ok, checkpoint_data} = Yex.encode_state_as_update(doc) + + # Insert as existing checkpoint + {:ok, _} = + Repo.insert(%DocumentState{ + document_name: document_name, + state_data: checkpoint_data, + version: :checkpoint + }) + + # Create a delta update that modifies data + Yex.Doc.transaction(doc, "modify", fn -> + Yex.Map.set(workflow_map, "name", "Modified Name") + Yex.Map.set(workflow_map, "lock_version", 2) + end) + + {:ok, update_data} = Yex.encode_state_as_update(doc) + + # Wait a bit so inserted_at is different + Process.sleep(10) + + {:ok, _} = + Repo.insert(%DocumentState{ + document_name: document_name, + state_data: update_data, + version: :update + }) + + # Start PersistenceWriter and trigger checkpoint + {:ok, persistence_writer} = + PersistenceWriter.start_link( + document_name: document_name, + name: Registry.via({:persistence_writer, document_name}) + ) + + send(persistence_writer, :create_checkpoint) + Process.sleep(100) + + # Get the new checkpoint (should be the most recent one) + checkpoints = + Repo.all( + from d in DocumentState, + where: + d.document_name == ^document_name and d.version == ^"checkpoint", + order_by: [desc: d.inserted_at] + ) + + # Should have 2 checkpoints now + assert length(checkpoints) == 2 + + # Latest checkpoint should have merged data + [latest_checkpoint | _] = checkpoints + new_doc = Yex.Doc.new() + :ok = Yex.apply_update(new_doc, latest_checkpoint.state_data) + + new_workflow_map = Yex.Doc.get_map(new_doc, "workflow") + + # Verify merged data + assert Yex.Map.fetch!(new_workflow_map, "id") == workflow.id + assert Yex.Map.fetch!(new_workflow_map, "name") == "Modified Name" + assert Yex.Map.fetch!(new_workflow_map, "lock_version") == 2 + + # Clean up + GenServer.stop(persistence_writer, :normal) + end + end end