diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f906d42b0..051253a82b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,10 @@ and this project adheres to ### Changed +- Support a batch of logs submitted to the `run:log` channel by the worker + [#4123](https://github.com/OpenFn/lightning/issues/4123) (backwards + compatible) + ### Fixed - Fix new jobs are misplaced on the canvas in manual layout diff --git a/lib/lightning/runs.ex b/lib/lightning/runs.ex index 84153b4ece..a25978cfd3 100644 --- a/lib/lightning/runs.ex +++ b/lib/lightning/runs.ex @@ -262,28 +262,44 @@ defmodule Lightning.Runs do end def append_run_log(run, params, scrubber \\ nil) do - LogLine.new(run, params, scrubber) - |> Ecto.Changeset.validate_change(:step_id, fn _field, step_id -> - if is_nil(step_id) do - [] - else - where(Lightning.RunStep, step_id: ^step_id, run_id: ^run.id) - |> Repo.exists?() - |> if do - [] - else - [{:step_id, "must be associated with the run"}] - end + log_entries = + case Map.get(params, :logs) || Map.get(params, "logs") do + logs when is_list(logs) -> logs + nil -> [params] end - end) - |> Repo.insert() - |> case do - {:ok, log_line} -> - Events.log_appended(log_line) - {:ok, log_line} - {:error, changeset} -> - {:error, changeset} + results = + Enum.map(log_entries, fn log_entry -> + IO.inspect(log_entry, label: "log object in append_run_log") + + LogLine.new(run, log_entry, scrubber) + |> Ecto.Changeset.validate_change(:step_id, fn _field, step_id -> + if is_nil(step_id) do + [] + else + where(Lightning.RunStep, step_id: ^step_id, run_id: ^run.id) + |> Repo.exists?() + |> if do + [] + else + [{:step_id, "must be associated with the run"}] + end + end + end) + |> Repo.insert() + |> case do + {:ok, log_line} -> + Events.log_appended(log_line) + {:ok, log_line} + + {:error, changeset} -> + {:error, changeset} + end + end) + + case Enum.find(results, fn result -> match?({:error, _}, result) end) do + nil -> :ok + error -> error end end diff --git a/lib/lightning_web/channels/run_channel.ex b/lib/lightning_web/channels/run_channel.ex index a8a6d4fe5d..c1d018fe4e 100644 --- a/lib/lightning_web/channels/run_channel.ex +++ b/lib/lightning_web/channels/run_channel.ex @@ -219,8 +219,8 @@ defmodule LightningWeb.RunChannel do {:error, changeset} -> reply_with(socket, {:error, changeset}) - {:ok, log_line} -> - reply_with(socket, {:ok, %{log_line_id: log_line.id}}) + :ok -> + reply_with(socket, :ok) end end