Skip to content
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ and this project adheres to

### Fixed

- Fixed AI chat session persistence when switching between jobs in workflow
editor [#3745](https://github.com/OpenFn/lightning/issues/3745)

## [2.15.0-pre] - 2025-11-20

### Added
Expand Down
97 changes: 79 additions & 18 deletions lib/lightning/ai_assistant/ai_assistant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,49 @@ defmodule Lightning.AiAssistant do
|> handle_transaction_result()
end

defp session_query(session_id, opts \\ []) do
job_id = Keyword.get(opts, :job_id)

query = from(s in ChatSession, where: s.id == ^session_id)

if job_id do
where(query, [s], s.job_id == ^job_id)
else
query
end
end

defp preload_session(session) do
session =
Repo.preload(session, [:user, messages: {session_messages_query(), :user}])

if session.session_type == "workflow_template" do
Repo.preload(session, :project)
else
session
end
end

@doc """
Gets a session by ID.

Returns the session or `nil` if not found.

## Examples

iex> get_session(session_id)
%ChatSession{}

iex> get_session("nonexistent")
nil
"""
def get_session(session_id) when is_binary(session_id) do
case session_query(session_id) |> Repo.one() do
nil -> nil
session -> preload_session(session)
end
end

@doc """
Retrieves a chat session by ID with all related data preloaded.

Expand All @@ -261,29 +304,47 @@ defmodule Lightning.AiAssistant do

`Ecto.NoResultsError` if no session exists with the given ID.
"""
def get_session!(id) do
session =
ChatSession
|> Repo.get!(id)
|> Repo.preload([:user, messages: {session_messages_query(), :user}])
def get_session!(session_id) when is_binary(session_id) do
session_query(session_id)
|> Repo.one!()
|> preload_session()
end

if session.session_type == "workflow_template" do
Repo.preload(session, :project)
else
session
@doc """
Gets a session scoped to a specific job.

Returns the session or `nil` if not found or doesn't belong to the job.

## Examples

iex> get_session(session_id, job)
%ChatSession{}

iex> get_session(session_id, different_job)
nil
"""
def get_session(session_id, %Job{id: job_id}) do
case session_query(session_id, job_id: job_id) |> Repo.one() do
nil -> nil
session -> preload_session(session)
end
end

def get_session(id) do
case Repo.get(ChatSession, id) do
nil ->
{:error, :not_found}
@doc """
Gets a session scoped to a specific job, raising if not found.

session ->
{:ok,
session
|> Repo.preload([:user, messages: {session_messages_query(), :user}])}
end
## Examples

iex> get_session!(session_id, job)
%ChatSession{}

iex> get_session!(session_id, different_job)
** (Ecto.NoResultsError)
"""
def get_session!(session_id, %Job{id: job_id}) do
session_query(session_id, job_id: job_id)
|> Repo.one!()
|> preload_session()
end

defp session_messages_query do
Expand Down
48 changes: 36 additions & 12 deletions lib/lightning_web/live/ai_assistant/component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ defmodule LightningWeb.AiAssistant.Component do
|> AiAssistant.retry_message()
|> case do
{:ok, {_message, _oban_job}} ->
{:ok, session} = AiAssistant.get_session(socket.assigns.session.id)
session = AiAssistant.get_session!(socket.assigns.session.id)

{:noreply,
socket
Expand Down Expand Up @@ -388,27 +388,39 @@ defmodule LightningWeb.AiAssistant.Component do

case action do
:new ->
socket
|> delegate_to_handler(:on_session_close)
|> assign_async([:all_sessions, :pagination_meta], fn ->
case handler.list_sessions(assigns, sort_direction,
limit: @default_page_size
) do
%{sessions: sessions, pagination: pagination} ->
{:ok, %{all_sessions: sessions, pagination_meta: pagination}}
end
end)
load_and_show_session_list(socket, handler, assigns, sort_direction)

:show ->
session = handler.get_session!(assigns)
load_and_show_session(socket, handler, assigns)
end
end

defp load_and_show_session_list(socket, handler, assigns, sort_direction) do
socket
|> delegate_to_handler(:on_session_close)
|> assign(:action, :new)
|> assign(:session, nil)
|> assign_async([:all_sessions, :pagination_meta], fn ->
case handler.list_sessions(assigns, sort_direction,
limit: @default_page_size
) do
%{sessions: sessions, pagination: pagination} ->
{:ok, %{all_sessions: sessions, pagination_meta: pagination}}
end
end)
end

defp load_and_show_session(socket, handler, assigns) do
case fetch_session(handler, assigns) do
{:ok, session} ->
message_loading =
Enum.any?(session.messages, fn msg ->
msg.role == :user && msg.status in [:pending, :processing]
end)

socket
|> assign(:session, session)
|> assign(:action, :show)
|> assign(
:pending_message,
if message_loading do
Expand All @@ -418,9 +430,21 @@ defmodule LightningWeb.AiAssistant.Component do
end
)
|> delegate_to_handler(:on_session_open, [session])

:error ->
socket
|> apply_action(:new)
|> assign(:action, :new)
|> assign(:session, nil)
end
end

defp fetch_session(handler, assigns) do
{:ok, handler.get_session!(assigns)}
rescue
Ecto.NoResultsError -> :error
end

defp save_message(socket, action, content) do
result =
case action do
Expand Down
2 changes: 1 addition & 1 deletion lib/lightning_web/live/ai_assistant/modes/job_code.ex
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ defmodule LightningWeb.Live.AiAssistant.Modes.JobCode do
@impl true
@spec get_session!(assigns()) :: session()
def get_session!(%{chat_session_id: session_id, selected_job: job} = assigns) do
AiAssistant.get_session!(session_id)
AiAssistant.get_session!(session_id, job)
|> AiAssistant.put_expression_and_adaptor(job.body, job.adaptor)
|> maybe_add_run_logs(job, assigns[:follow_run])
end
Expand Down
18 changes: 14 additions & 4 deletions lib/lightning_web/live/workflow_live/edit.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,7 @@ defmodule LightningWeb.WorkflowLive.Edit do
page_title: "",
selected_edge: nil,
selected_job: nil,
last_selected_job: nil,
selected_run: nil,
selected_trigger: nil,
selection_mode: nil,
Expand Down Expand Up @@ -3610,10 +3611,19 @@ defmodule LightningWeb.WorkflowLive.Edit do
end

defp assign_chat_session_id(socket, params) do
job_chat_session_id =
if changed?(socket, :selected_job) &&
not is_nil(socket.assigns[:last_selected_job]) do
nil
else
params["j-chat"]
end

socket
|> assign(
workflow_chat_session_id: params["w-chat"],
job_chat_session_id: params["j-chat"]
job_chat_session_id: job_chat_session_id,
last_selected_job: socket.assigns[:selected_job]
)
end

Expand Down Expand Up @@ -3737,12 +3747,12 @@ defmodule LightningWeb.WorkflowLive.Edit do
})
when is_binary(chat_id) do
case Lightning.AiAssistant.get_session(chat_id) do
{:ok, session} ->
%Lightning.AiAssistant.ChatSession{} = session ->
Lightning.AiAssistant.associate_workflow(session, workflow)

{:error, reason} ->
nil ->
Logger.warning(
"Failed to associate workflow with chat session #{chat_id}: #{inspect(reason)}"
"Failed to associate workflow with chat session #{chat_id}: not found"
)
end
end
Expand Down
Loading