Skip to content

Commit 6fd1ec1

Browse files
committed
Fix checkpoint creation bugs and add tests
Two bugs fixed in create_checkpoint/1: 1. Used .data instead of .state_data when accessing DocumentState records 2. Used string "checkpoint" instead of atom :checkpoint for Ecto.Enum Added tests to verify checkpoint creation works correctly: - Test creating checkpoint from multiple updates - Test merging existing checkpoint with new updates Both bugs would cause crashes when a document reached 500 updates and triggered automatic checkpoint creation. Closes #4176
1 parent 572df04 commit 6fd1ec1

File tree

3 files changed

+167
-3
lines changed

3 files changed

+167
-3
lines changed

CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ and this project adheres to
2121

2222
### Fixed
2323

24-
- Fix incorrect field access in checkpoint creation that would crash after 500
25-
document updates [#4176](https://github.com/OpenFn/lightning/issues/4176)
24+
- Fix incorrect field access and version type in checkpoint creation that would
25+
crash after 500 document updates
26+
[#4176](https://github.com/OpenFn/lightning/issues/4176)
2627

2728
## [2.15.0-pre4] - 2025-12-08
2829

lib/lightning/collaboration/persistence_writer.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ defmodule Lightning.Collaboration.PersistenceWriter do
421421

422422
checkpoint = %DocumentState{
423423
document_name: document_name,
424-
version: "checkpoint",
424+
version: :checkpoint,
425425
state_data: checkpoint_data
426426
}
427427

test/lightning/collaboration/document_supervisor_test.exs

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
defmodule Lightning.Collaboration.DocumentSupervisorTest do
22
use Lightning.DataCase, async: false
33

4+
alias Lightning.Collaboration.DocumentState
45
alias Lightning.Collaboration.DocumentSupervisor
6+
alias Lightning.Collaboration.PersistenceWriter
57
alias Lightning.Collaboration.Registry
68

79
import Eventually
@@ -812,4 +814,165 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do
812814
verify_cleanup(document_name, workflow_id)
813815
end
814816
end
817+
818+
describe "9. Checkpoint Creation" do
819+
test "creates checkpoint from persisted updates", %{workflow: workflow} do
820+
document_name = "workflow:#{workflow.id}"
821+
822+
# Create some initial document state with Y.Doc data
823+
doc = Yex.Doc.new()
824+
workflow_map = Yex.Doc.get_map(doc, "workflow")
825+
826+
Yex.Doc.transaction(doc, "setup", fn ->
827+
Yex.Map.set(workflow_map, "id", workflow.id)
828+
Yex.Map.set(workflow_map, "name", "Test Workflow")
829+
Yex.Map.set(workflow_map, "lock_version", 1)
830+
end)
831+
832+
{:ok, update_data} = Yex.encode_state_as_update(doc)
833+
834+
# Insert as an update (not checkpoint)
835+
{:ok, _} =
836+
Repo.insert(%DocumentState{
837+
document_name: document_name,
838+
state_data: update_data,
839+
version: :update
840+
})
841+
842+
# Add a second update with additional data
843+
Yex.Doc.transaction(doc, "add_concurrency", fn ->
844+
Yex.Map.set(workflow_map, "concurrency", 10)
845+
end)
846+
847+
{:ok, update_data2} = Yex.encode_state_as_update(doc)
848+
849+
{:ok, _} =
850+
Repo.insert(%DocumentState{
851+
document_name: document_name,
852+
state_data: update_data2,
853+
version: :update
854+
})
855+
856+
# Start PersistenceWriter
857+
{:ok, persistence_writer} =
858+
PersistenceWriter.start_link(
859+
document_name: document_name,
860+
name: Registry.via({:persistence_writer, document_name})
861+
)
862+
863+
# Trigger checkpoint creation by sending the message directly
864+
send(persistence_writer, :create_checkpoint)
865+
866+
# Wait for the checkpoint to be created
867+
Process.sleep(100)
868+
869+
# Verify checkpoint was created
870+
checkpoint =
871+
Repo.one(
872+
from d in DocumentState,
873+
where:
874+
d.document_name == ^document_name and d.version == ^"checkpoint",
875+
order_by: [desc: d.inserted_at],
876+
limit: 1
877+
)
878+
879+
assert checkpoint != nil, "Checkpoint should have been created"
880+
881+
# Verify checkpoint contains valid Y.Doc data by loading it
882+
checkpoint_doc = Yex.Doc.new()
883+
:ok = Yex.apply_update(checkpoint_doc, checkpoint.state_data)
884+
885+
checkpoint_workflow_map = Yex.Doc.get_map(checkpoint_doc, "workflow")
886+
887+
# Verify all data is present in checkpoint
888+
assert Yex.Map.fetch!(checkpoint_workflow_map, "id") == workflow.id
889+
assert Yex.Map.fetch!(checkpoint_workflow_map, "name") == "Test Workflow"
890+
assert Yex.Map.fetch!(checkpoint_workflow_map, "lock_version") == 1
891+
assert Yex.Map.fetch!(checkpoint_workflow_map, "concurrency") == 10
892+
893+
# Clean up
894+
GenServer.stop(persistence_writer, :normal)
895+
end
896+
897+
test "creates checkpoint merging existing checkpoint with updates", %{
898+
workflow: workflow
899+
} do
900+
document_name = "workflow:#{workflow.id}"
901+
902+
# Create initial state
903+
doc = Yex.Doc.new()
904+
workflow_map = Yex.Doc.get_map(doc, "workflow")
905+
906+
Yex.Doc.transaction(doc, "initial", fn ->
907+
Yex.Map.set(workflow_map, "id", workflow.id)
908+
Yex.Map.set(workflow_map, "name", "Initial Name")
909+
Yex.Map.set(workflow_map, "lock_version", 1)
910+
end)
911+
912+
{:ok, checkpoint_data} = Yex.encode_state_as_update(doc)
913+
914+
# Insert as existing checkpoint
915+
{:ok, _} =
916+
Repo.insert(%DocumentState{
917+
document_name: document_name,
918+
state_data: checkpoint_data,
919+
version: :checkpoint
920+
})
921+
922+
# Create a delta update that modifies data
923+
Yex.Doc.transaction(doc, "modify", fn ->
924+
Yex.Map.set(workflow_map, "name", "Modified Name")
925+
Yex.Map.set(workflow_map, "lock_version", 2)
926+
end)
927+
928+
{:ok, update_data} = Yex.encode_state_as_update(doc)
929+
930+
# Wait a bit so inserted_at is different
931+
Process.sleep(10)
932+
933+
{:ok, _} =
934+
Repo.insert(%DocumentState{
935+
document_name: document_name,
936+
state_data: update_data,
937+
version: :update
938+
})
939+
940+
# Start PersistenceWriter and trigger checkpoint
941+
{:ok, persistence_writer} =
942+
PersistenceWriter.start_link(
943+
document_name: document_name,
944+
name: Registry.via({:persistence_writer, document_name})
945+
)
946+
947+
send(persistence_writer, :create_checkpoint)
948+
Process.sleep(100)
949+
950+
# Get the new checkpoint (should be the most recent one)
951+
checkpoints =
952+
Repo.all(
953+
from d in DocumentState,
954+
where:
955+
d.document_name == ^document_name and d.version == ^"checkpoint",
956+
order_by: [desc: d.inserted_at]
957+
)
958+
959+
# Should have 2 checkpoints now
960+
assert length(checkpoints) == 2
961+
962+
# Latest checkpoint should have merged data
963+
[latest_checkpoint | _] = checkpoints
964+
new_doc = Yex.Doc.new()
965+
:ok = Yex.apply_update(new_doc, latest_checkpoint.state_data)
966+
967+
new_workflow_map = Yex.Doc.get_map(new_doc, "workflow")
968+
969+
# Verify merged data
970+
assert Yex.Map.fetch!(new_workflow_map, "id") == workflow.id
971+
assert Yex.Map.fetch!(new_workflow_map, "name") == "Modified Name"
972+
assert Yex.Map.fetch!(new_workflow_map, "lock_version") == 2
973+
974+
# Clean up
975+
GenServer.stop(persistence_writer, :normal)
976+
end
977+
end
815978
end

0 commit comments

Comments
 (0)