Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ and this project adheres to
- Fix infinite loading when reopening workflow in collaborative editor due to
delta updates not being merged with persisted state
[#4164](https://github.com/OpenFn/lightning/issues/4164)
- Fix incorrect field access and version type in checkpoint creation that would
crash after 500 document updates
[#4176](https://github.com/OpenFn/lightning/issues/4176)

## [2.15.0-pre4] - 2025-12-08

Expand Down
6 changes: 3 additions & 3 deletions lib/lightning/collaboration/persistence_writer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -416,20 +416,20 @@ defmodule Lightning.Collaboration.PersistenceWriter do

# Apply checkpoint first if it exists
if latest_checkpoint do
Yex.apply_update(temp_doc, latest_checkpoint.data)
Yex.apply_update(temp_doc, latest_checkpoint.state_data)
end

# Apply all updates in order
Enum.each(updates, fn update_record ->
Yex.apply_update(temp_doc, update_record.data)
Yex.apply_update(temp_doc, update_record.state_data)
end)

# Create new checkpoint
checkpoint_data = Yex.encode_state_as_update!(temp_doc)

checkpoint = %DocumentState{
document_name: document_name,
version: "checkpoint",
version: :checkpoint,
state_data: checkpoint_data
}

Expand Down
163 changes: 163 additions & 0 deletions test/lightning/collaboration/document_supervisor_test.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
defmodule Lightning.Collaboration.DocumentSupervisorTest do
use Lightning.DataCase, async: false

alias Lightning.Collaboration.DocumentState
alias Lightning.Collaboration.DocumentSupervisor
alias Lightning.Collaboration.PersistenceWriter
alias Lightning.Collaboration.Registry

import Eventually
Expand Down Expand Up @@ -812,4 +814,165 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do
verify_cleanup(document_name, workflow_id)
end
end

describe "9. Checkpoint Creation" do
test "creates checkpoint from persisted updates", %{workflow: workflow} do
document_name = "workflow:#{workflow.id}"

# Create some initial document state with Y.Doc data
doc = Yex.Doc.new()
workflow_map = Yex.Doc.get_map(doc, "workflow")

Yex.Doc.transaction(doc, "setup", fn ->
Yex.Map.set(workflow_map, "id", workflow.id)
Yex.Map.set(workflow_map, "name", "Test Workflow")
Yex.Map.set(workflow_map, "lock_version", 1)
end)

{:ok, update_data} = Yex.encode_state_as_update(doc)

# Insert as an update (not checkpoint)
{:ok, _} =
Repo.insert(%DocumentState{
document_name: document_name,
state_data: update_data,
version: :update
})

# Add a second update with additional data
Yex.Doc.transaction(doc, "add_concurrency", fn ->
Yex.Map.set(workflow_map, "concurrency", 10)
end)

{:ok, update_data2} = Yex.encode_state_as_update(doc)

{:ok, _} =
Repo.insert(%DocumentState{
document_name: document_name,
state_data: update_data2,
version: :update
})

# Start PersistenceWriter
{:ok, persistence_writer} =
PersistenceWriter.start_link(
document_name: document_name,
name: Registry.via({:persistence_writer, document_name})
)

# Trigger checkpoint creation by sending the message directly
send(persistence_writer, :create_checkpoint)

# Wait for the checkpoint to be created
Process.sleep(100)

# Verify checkpoint was created
checkpoint =
Repo.one(
from d in DocumentState,
where:
d.document_name == ^document_name and d.version == ^"checkpoint",
order_by: [desc: d.inserted_at],
limit: 1
)

assert checkpoint != nil, "Checkpoint should have been created"

# Verify checkpoint contains valid Y.Doc data by loading it
checkpoint_doc = Yex.Doc.new()
:ok = Yex.apply_update(checkpoint_doc, checkpoint.state_data)

checkpoint_workflow_map = Yex.Doc.get_map(checkpoint_doc, "workflow")

# Verify all data is present in checkpoint
assert Yex.Map.fetch!(checkpoint_workflow_map, "id") == workflow.id
assert Yex.Map.fetch!(checkpoint_workflow_map, "name") == "Test Workflow"
assert Yex.Map.fetch!(checkpoint_workflow_map, "lock_version") == 1
assert Yex.Map.fetch!(checkpoint_workflow_map, "concurrency") == 10

# Clean up
GenServer.stop(persistence_writer, :normal)
end

test "creates checkpoint merging existing checkpoint with updates", %{
workflow: workflow
} do
document_name = "workflow:#{workflow.id}"

# Create initial state
doc = Yex.Doc.new()
workflow_map = Yex.Doc.get_map(doc, "workflow")

Yex.Doc.transaction(doc, "initial", fn ->
Yex.Map.set(workflow_map, "id", workflow.id)
Yex.Map.set(workflow_map, "name", "Initial Name")
Yex.Map.set(workflow_map, "lock_version", 1)
end)

{:ok, checkpoint_data} = Yex.encode_state_as_update(doc)

# Insert as existing checkpoint
{:ok, _} =
Repo.insert(%DocumentState{
document_name: document_name,
state_data: checkpoint_data,
version: :checkpoint
})

# Create a delta update that modifies data
Yex.Doc.transaction(doc, "modify", fn ->
Yex.Map.set(workflow_map, "name", "Modified Name")
Yex.Map.set(workflow_map, "lock_version", 2)
end)

{:ok, update_data} = Yex.encode_state_as_update(doc)

# Wait a bit so inserted_at is different
Process.sleep(10)

{:ok, _} =
Repo.insert(%DocumentState{
document_name: document_name,
state_data: update_data,
version: :update
})

# Start PersistenceWriter and trigger checkpoint
{:ok, persistence_writer} =
PersistenceWriter.start_link(
document_name: document_name,
name: Registry.via({:persistence_writer, document_name})
)

send(persistence_writer, :create_checkpoint)
Process.sleep(100)

# Get the new checkpoint (should be the most recent one)
checkpoints =
Repo.all(
from d in DocumentState,
where:
d.document_name == ^document_name and d.version == ^"checkpoint",
order_by: [desc: d.inserted_at]
)

# Should have 2 checkpoints now
assert length(checkpoints) == 2

# Latest checkpoint should have merged data
[latest_checkpoint | _] = checkpoints
new_doc = Yex.Doc.new()
:ok = Yex.apply_update(new_doc, latest_checkpoint.state_data)

new_workflow_map = Yex.Doc.get_map(new_doc, "workflow")

# Verify merged data
assert Yex.Map.fetch!(new_workflow_map, "id") == workflow.id
assert Yex.Map.fetch!(new_workflow_map, "name") == "Modified Name"
assert Yex.Map.fetch!(new_workflow_map, "lock_version") == 2

# Clean up
GenServer.stop(persistence_writer, :normal)
end
end
end