Skip to content

Commit bd14d5b

Browse files
committed
Fix checkpoint creation bugs and add tests
Two bugs fixed in create_checkpoint/1: 1. Used .data instead of .state_data when accessing DocumentState records 2. Used string "checkpoint" instead of atom :checkpoint for Ecto.Enum Added tests to verify checkpoint creation works correctly: - Test creating checkpoint from multiple updates - Test merging existing checkpoint with new updates Both bugs would cause crashes when a document reached 500 updates and triggered automatic checkpoint creation. Closes #4176
1 parent 572df04 commit bd14d5b

File tree

3 files changed

+168
-3
lines changed

3 files changed

+168
-3
lines changed

CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ and this project adheres to
2121

2222
### Fixed
2323

24-
- Fix incorrect field access in checkpoint creation that would crash after 500
25-
document updates [#4176](https://github.com/OpenFn/lightning/issues/4176)
24+
- Fix incorrect field access and version type in checkpoint creation that would
25+
crash after 500 document updates
26+
[#4176](https://github.com/OpenFn/lightning/issues/4176)
2627

2728
## [2.15.0-pre4] - 2025-12-08
2829

lib/lightning/collaboration/persistence_writer.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ defmodule Lightning.Collaboration.PersistenceWriter do
421421

422422
checkpoint = %DocumentState{
423423
document_name: document_name,
424-
version: "checkpoint",
424+
version: :checkpoint,
425425
state_data: checkpoint_data
426426
}
427427

test/lightning/collaboration/document_supervisor_test.exs

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,4 +812,168 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do
812812
verify_cleanup(document_name, workflow_id)
813813
end
814814
end
815+
816+
describe "9. Checkpoint Creation" do
817+
alias Lightning.Collaboration.DocumentState
818+
alias Lightning.Collaboration.PersistenceWriter
819+
820+
test "creates checkpoint from persisted updates", %{workflow: workflow} do
821+
document_name = "workflow:#{workflow.id}"
822+
823+
# Create some initial document state with Y.Doc data
824+
doc = Yex.Doc.new()
825+
workflow_map = Yex.Doc.get_map(doc, "workflow")
826+
827+
Yex.Doc.transaction(doc, "setup", fn ->
828+
Yex.Map.set(workflow_map, "id", workflow.id)
829+
Yex.Map.set(workflow_map, "name", "Test Workflow")
830+
Yex.Map.set(workflow_map, "lock_version", 1)
831+
end)
832+
833+
{:ok, update_data} = Yex.encode_state_as_update(doc)
834+
835+
# Insert as an update (not checkpoint)
836+
{:ok, _} =
837+
Repo.insert(%DocumentState{
838+
document_name: document_name,
839+
state_data: update_data,
840+
version: :update
841+
})
842+
843+
# Add a second update with additional data
844+
Yex.Doc.transaction(doc, "add_concurrency", fn ->
845+
Yex.Map.set(workflow_map, "concurrency", 10)
846+
end)
847+
848+
{:ok, update_data2} = Yex.encode_state_as_update(doc)
849+
850+
{:ok, _} =
851+
Repo.insert(%DocumentState{
852+
document_name: document_name,
853+
state_data: update_data2,
854+
version: :update
855+
})
856+
857+
# Start PersistenceWriter
858+
{:ok, persistence_writer} =
859+
PersistenceWriter.start_link(
860+
document_name: document_name,
861+
name: Registry.via({:persistence_writer, document_name})
862+
)
863+
864+
# Trigger checkpoint creation by sending the message directly
865+
send(persistence_writer, :create_checkpoint)
866+
867+
# Wait for the checkpoint to be created
868+
Process.sleep(100)
869+
870+
# Verify checkpoint was created
871+
checkpoint =
872+
Repo.one(
873+
from d in DocumentState,
874+
where:
875+
d.document_name == ^document_name and d.version == ^"checkpoint",
876+
order_by: [desc: d.inserted_at],
877+
limit: 1
878+
)
879+
880+
assert checkpoint != nil, "Checkpoint should have been created"
881+
882+
# Verify checkpoint contains valid Y.Doc data by loading it
883+
checkpoint_doc = Yex.Doc.new()
884+
:ok = Yex.apply_update(checkpoint_doc, checkpoint.state_data)
885+
886+
checkpoint_workflow_map = Yex.Doc.get_map(checkpoint_doc, "workflow")
887+
888+
# Verify all data is present in checkpoint
889+
assert Yex.Map.fetch!(checkpoint_workflow_map, "id") == workflow.id
890+
assert Yex.Map.fetch!(checkpoint_workflow_map, "name") == "Test Workflow"
891+
assert Yex.Map.fetch!(checkpoint_workflow_map, "lock_version") == 1
892+
assert Yex.Map.fetch!(checkpoint_workflow_map, "concurrency") == 10
893+
894+
# Clean up
895+
GenServer.stop(persistence_writer, :normal)
896+
end
897+
898+
test "creates checkpoint merging existing checkpoint with updates", %{
899+
workflow: workflow
900+
} do
901+
document_name = "workflow:#{workflow.id}"
902+
903+
# Create initial state
904+
doc = Yex.Doc.new()
905+
workflow_map = Yex.Doc.get_map(doc, "workflow")
906+
907+
Yex.Doc.transaction(doc, "initial", fn ->
908+
Yex.Map.set(workflow_map, "id", workflow.id)
909+
Yex.Map.set(workflow_map, "name", "Initial Name")
910+
Yex.Map.set(workflow_map, "lock_version", 1)
911+
end)
912+
913+
{:ok, checkpoint_data} = Yex.encode_state_as_update(doc)
914+
915+
# Insert as existing checkpoint
916+
{:ok, _} =
917+
Repo.insert(%DocumentState{
918+
document_name: document_name,
919+
state_data: checkpoint_data,
920+
version: :checkpoint
921+
})
922+
923+
# Create a delta update that modifies data
924+
Yex.Doc.transaction(doc, "modify", fn ->
925+
Yex.Map.set(workflow_map, "name", "Modified Name")
926+
Yex.Map.set(workflow_map, "lock_version", 2)
927+
end)
928+
929+
{:ok, update_data} = Yex.encode_state_as_update(doc)
930+
931+
# Wait a bit so inserted_at is different
932+
Process.sleep(10)
933+
934+
{:ok, _} =
935+
Repo.insert(%DocumentState{
936+
document_name: document_name,
937+
state_data: update_data,
938+
version: :update
939+
})
940+
941+
# Start PersistenceWriter and trigger checkpoint
942+
{:ok, persistence_writer} =
943+
PersistenceWriter.start_link(
944+
document_name: document_name,
945+
name: Registry.via({:persistence_writer, document_name})
946+
)
947+
948+
send(persistence_writer, :create_checkpoint)
949+
Process.sleep(100)
950+
951+
# Get the new checkpoint (should be the most recent one)
952+
checkpoints =
953+
Repo.all(
954+
from d in DocumentState,
955+
where:
956+
d.document_name == ^document_name and d.version == ^"checkpoint",
957+
order_by: [desc: d.inserted_at]
958+
)
959+
960+
# Should have 2 checkpoints now
961+
assert length(checkpoints) == 2
962+
963+
# Latest checkpoint should have merged data
964+
[latest_checkpoint | _] = checkpoints
965+
new_doc = Yex.Doc.new()
966+
:ok = Yex.apply_update(new_doc, latest_checkpoint.state_data)
967+
968+
new_workflow_map = Yex.Doc.get_map(new_doc, "workflow")
969+
970+
# Verify merged data
971+
assert Yex.Map.fetch!(new_workflow_map, "id") == workflow.id
972+
assert Yex.Map.fetch!(new_workflow_map, "name") == "Modified Name"
973+
assert Yex.Map.fetch!(new_workflow_map, "lock_version") == 2
974+
975+
# Clean up
976+
GenServer.stop(persistence_writer, :normal)
977+
end
978+
end
815979
end

0 commit comments

Comments
 (0)