Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ and this project adheres to

### Fixed

- Fix infinite loading when reopening workflow in collaborative editor due to
delta updates not being merged with persisted state
[#4164](https://github.com/OpenFn/lightning/issues/4164)
- Fix incorrect field access and version type in checkpoint creation that would
crash after 500 document updates
[#4176](https://github.com/OpenFn/lightning/issues/4176)

## [2.15.0-pre4] - 2025-12-08

### Added
Expand Down
102 changes: 102 additions & 0 deletions lib/lightning/collaboration/document_state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,19 @@ defmodule Lightning.Collaboration.DocumentState do

use Ecto.Schema
import Ecto.Changeset
import Ecto.Query

alias Lightning.Repo

@type t :: %__MODULE__{
id: integer() | nil,
document_name: String.t() | nil,
state_data: binary() | nil,
state_vector: binary() | nil,
version: :update | :checkpoint | :state_vector | nil,
inserted_at: DateTime.t() | nil,
updated_at: DateTime.t() | nil
}

schema "collaboration_document_states" do
field :document_name, :string
Expand All @@ -30,4 +43,93 @@ defmodule Lightning.Collaboration.DocumentState do
])
|> validate_required([:document_name, :state_data, :version])
end

@doc """
Retrieves the latest checkpoint and all updates since that checkpoint
for a given document.

Returns `{:ok, checkpoint, updates}` where checkpoint may be nil,
or `{:error, :not_found}` if no persisted state exists.
"""
@spec get_checkpoint_and_updates(String.t()) ::
{:ok, __MODULE__.t() | nil, [__MODULE__.t()]} | {:error, :not_found}
def get_checkpoint_and_updates(doc_name) do
checkpoint = get_latest_checkpoint(doc_name)

checkpoint_time =
if checkpoint, do: checkpoint.inserted_at, else: ~U[1970-01-01 00:00:00Z]

updates = get_updates_since(doc_name, checkpoint_time)

if checkpoint || length(updates) > 0 do
{:ok, checkpoint, updates}
else
{:error, :not_found}
end
end

@doc """
Retrieves the latest checkpoint for a document, or nil if none exists.
"""
@spec get_latest_checkpoint(String.t()) :: __MODULE__.t() | nil
def get_latest_checkpoint(doc_name) do
Repo.one(
from d in __MODULE__,
where: d.document_name == ^doc_name and d.version == :checkpoint,
order_by: [desc: d.inserted_at],
limit: 1
)
end

@doc """
Retrieves all updates for a document inserted after the given timestamp,
ordered chronologically (oldest first).
"""
@spec get_updates_since(String.t(), DateTime.t()) :: [__MODULE__.t()]
def get_updates_since(doc_name, since) do
Repo.all(
from d in __MODULE__,
where:
d.document_name == ^doc_name and
d.version == :update and
d.inserted_at > ^since,
order_by: [asc: d.inserted_at]
)
end

@doc """
Applies persisted state (checkpoint + updates) to a Yex document.

Applies the checkpoint first (if present), then all updates in
chronological order.
"""
@spec apply_to_doc(Yex.Doc.t(), __MODULE__.t() | nil, [__MODULE__.t()]) :: :ok
def apply_to_doc(doc, checkpoint, updates) do
if checkpoint do
Yex.apply_update(doc, checkpoint.state_data)
end

Enum.each(updates, fn update ->
Yex.apply_update(doc, update.state_data)
end)

:ok
end

@doc """
Loads all persisted state for a document and applies it to a Yex document.

Convenience function that combines `get_checkpoint_and_updates/1` and
`apply_to_doc/3`.
"""
@spec load_into_doc(Yex.Doc.t(), String.t()) :: :ok
def load_into_doc(doc, doc_name) do
checkpoint = get_latest_checkpoint(doc_name)

checkpoint_time =
if checkpoint, do: checkpoint.inserted_at, else: ~U[1970-01-01 00:00:00Z]

updates = get_updates_since(doc_name, checkpoint_time)
apply_to_doc(doc, checkpoint, updates)
end
end
53 changes: 7 additions & 46 deletions lib/lightning/collaboration/persistence.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ defmodule Lightning.Collaboration.Persistence do
alias Lightning.Collaboration.DocumentState
alias Lightning.Collaboration.PersistenceWriter
alias Lightning.Collaboration.Session
alias Lightning.Repo

require Logger

Expand All @@ -27,7 +26,7 @@ defmodule Lightning.Collaboration.Persistence do
"""
end

case load_document_records(doc_name) do
case DocumentState.get_checkpoint_and_updates(doc_name) do
{:ok, checkpoint, updates} ->
apply_persisted_state(doc, doc_name, checkpoint, updates)
reconcile_or_reset(doc, doc_name, workflow)
Expand All @@ -46,7 +45,6 @@ defmodule Lightning.Collaboration.Persistence do

@impl true
def update_v1(state, update, doc_name, _doc) do
# Send to PersistenceWriter via state
case PersistenceWriter.add_update(doc_name, update) do
:ok ->
state
Expand Down Expand Up @@ -82,51 +80,12 @@ defmodule Lightning.Collaboration.Persistence do
end

# Private functions
defp load_document_records(doc_name) do
import Ecto.Query

# Get latest checkpoint
checkpoint =
Repo.one(
from d in DocumentState,
where: d.document_name == ^doc_name and d.version == :checkpoint,
order_by: [desc: d.inserted_at],
limit: 1
)

checkpoint_time =
if checkpoint, do: checkpoint.inserted_at, else: ~U[1970-01-01 00:00:00Z]

# Get updates after checkpoint
updates =
Repo.all(
from d in DocumentState,
where:
d.document_name == ^doc_name and
d.version == :update and
d.inserted_at > ^checkpoint_time,
order_by: [asc: d.inserted_at]
)

if checkpoint || length(updates) > 0 do
{:ok, checkpoint, updates}
else
{:error, :not_found}
end
end

defp apply_persisted_state(doc, doc_name, checkpoint, updates) do
# Apply checkpoint first if exists
if checkpoint do
Logger.info("Applying checkpoint to document. document=#{doc_name}")
Yex.apply_update(doc, checkpoint.state_data)
end

# Then apply updates in chronological order
Enum.each(updates, fn update ->
Yex.apply_update(doc, update.state_data)
end)

DocumentState.apply_to_doc(doc, checkpoint, updates)
Logger.debug("Loaded #{length(updates)} updates. document=#{doc_name}")
end

Expand Down Expand Up @@ -157,13 +116,15 @@ defmodule Lightning.Collaboration.Persistence do
case Yex.Map.fetch(workflow_map, "lock_version") do
{:ok, version} when is_float(version) -> trunc(version)
{:ok, version} when is_integer(version) -> version
{:ok, nil} -> nil
:error -> nil
end
end

defp stale?(persisted_version, current_version) do
persisted_version != current_version and not is_nil(persisted_version)
end
defp stale?(nil, current_version), do: not is_nil(current_version)

defp stale?(persisted_version, current_version),
do: persisted_version != current_version

defp clear_and_reset_workflow(doc, workflow) do
# Same pattern as Session.clear_and_reset_doc
Expand Down
28 changes: 18 additions & 10 deletions lib/lightning/collaboration/persistence_writer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,10 @@ defmodule Lightning.Collaboration.PersistenceWriter do
"Saving #{length(state.pending_updates)} pending updates for document: #{state.document_name}"
)

# Merge multiple updates if we have more than one
merged_update = merge_updates(state.pending_updates)
# Merge multiple updates if we have more than one.
# Pass document_name so merge_updates can load existing persisted state
# before applying deltas (Yjs deltas require the base state to merge correctly).
merged_update = merge_updates(state.pending_updates, state.document_name)

document_state = %DocumentState{
document_name: state.document_name,
Expand All @@ -343,7 +345,7 @@ defmodule Lightning.Collaboration.PersistenceWriter do
}

case Repo.insert(document_state) do
{:ok, _} ->
{:ok, _inserted} ->
{:ok, length(state.pending_updates)}

{:error, reason} ->
Expand All @@ -355,13 +357,19 @@ defmodule Lightning.Collaboration.PersistenceWriter do
{:error, exception}
end

defp merge_updates([update]), do: update
defp merge_updates([update], _document_name), do: update
Copy link
Contributor

@doc-han doc-han Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix for my issue #4154 overlaps with what you're doing here. I want to get rid of this function all together. If there's a single update doesn't mean that

  1. we don't want to generate a delta between the current state of the document(_document_name)
  2. also, properly encode the changes using Yex.encode_state_as_update!

with 2. I don't think that just returning the single update is the same as passing it through a new doc and encoding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On my end, I've consistently reproduced the error whenever there's a single update coming through ydoc which leads to this function. checking whether I can reproduce on this branch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. I think this PR fixes #4154 also. just revalidated whether a single update coming through causes an issue and it actually doesn't. Hence the issue with #4154 also had to do with the corrupted bytes. Took me a long route to get to merge_updates function.


defp merge_updates(updates) when length(updates) > 1 do
# Create a temporary document and apply all updates in reverse order
# (since we store them in reverse chronological order)
defp merge_updates(updates, document_name) when length(updates) > 1 do
# Load the current persisted state first, then apply the new deltas.
# This is necessary because Yjs delta updates only contain changes,
# not the full state. Without the base state, applying deltas to an
# empty document produces an empty result.
temp_doc = Yex.Doc.new()

# Load existing persisted state (checkpoint + any previously saved updates)
DocumentState.load_into_doc(temp_doc, document_name)

# Apply new updates in chronological order (oldest first)
updates
|> Enum.reverse()
|> Enum.each(fn update ->
Expand Down Expand Up @@ -408,20 +416,20 @@ defmodule Lightning.Collaboration.PersistenceWriter do

# Apply checkpoint first if it exists
if latest_checkpoint do
Yex.apply_update(temp_doc, latest_checkpoint.data)
Yex.apply_update(temp_doc, latest_checkpoint.state_data)
end

# Apply all updates in order
Enum.each(updates, fn update_record ->
Yex.apply_update(temp_doc, update_record.data)
Yex.apply_update(temp_doc, update_record.state_data)
end)

# Create new checkpoint
checkpoint_data = Yex.encode_state_as_update!(temp_doc)

checkpoint = %DocumentState{
document_name: document_name,
version: "checkpoint",
version: :checkpoint,
state_data: checkpoint_data
}

Expand Down
Loading