diff --git a/lib/phoenix/sync/controller.ex b/lib/phoenix/sync/controller.ex index ed19f65..caf5a6e 100644 --- a/lib/phoenix/sync/controller.ex +++ b/lib/phoenix/sync/controller.ex @@ -231,57 +231,57 @@ defmodule Phoenix.Sync.Controller do end defp sync_render_call(conn, api, params, predefined_shape) do - {:ok, shape_api} = Phoenix.Sync.Adapter.PlugApi.predefined_shape(api, predefined_shape) - - Phoenix.Sync.Adapter.PlugApi.call(shape_api, CORS.call(conn), params) + Phoenix.Sync.Electric.api_predefined_shape(conn, api, predefined_shape, fn conn, shape_api -> + Phoenix.Sync.Adapter.PlugApi.call(shape_api, CORS.call(conn), params) + end) end defp interruptible_call(conn, api, params, shape_fun) do predefined_shape = call_shape_fun(shape_fun) - {:ok, shape_api} = Adapter.PlugApi.predefined_shape(api, predefined_shape) + Phoenix.Sync.Electric.api_predefined_shape(conn, api, predefined_shape, fn conn, shape_api -> + {:ok, key} = ShapeRequestRegistry.register_shape(predefined_shape) - {:ok, key} = ShapeRequestRegistry.register_shape(predefined_shape) + try do + parent = self() + start_time = now() - try do - parent = self() - start_time = now() - - {:ok, pid} = - Task.start_link(fn -> - send(parent, {:response, self(), Adapter.PlugApi.call(shape_api, conn, params)}) - end) + {:ok, pid} = + Task.start_link(fn -> + send(parent, {:response, self(), Adapter.PlugApi.call(shape_api, conn, params)}) + end) - ref = Process.monitor(pid) + ref = Process.monitor(pid) - receive do - {:interrupt_shape, ^key, :server_interrupt} -> - Process.demonitor(ref, [:flush]) - Process.unlink(pid) - Process.exit(pid, :kill) - # immediately retry the same request -- if the shape_fun returns a - # different shape the client will receive a must-refetch response but - # if the shape is the same then the request will continue with no - # interruption. - # - # if possible adjust the long poll timeout to account for the time - # already spent before the interrupt. + receive do + {:interrupt_shape, ^key, :server_interrupt} -> + Process.demonitor(ref, [:flush]) + Process.unlink(pid) + Process.exit(pid, :kill) + # immediately retry the same request -- if the shape_fun returns a + # different shape the client will receive a must-refetch response but + # if the shape is the same then the request will continue with no + # interruption. + # + # if possible adjust the long poll timeout to account for the time + # already spent before the interrupt. - api = reduce_long_poll_timeout(api, start_time) + api = reduce_long_poll_timeout(api, start_time) - interruptible_call(conn, api, params, shape_fun) + interruptible_call(conn, api, params, shape_fun) - {:response, ^pid, conn} -> - Process.demonitor(ref, [:flush]) + {:response, ^pid, conn} -> + Process.demonitor(ref, [:flush]) - conn + conn - {:DOWN, ^ref, :process, _pid, reason} -> - Plug.Conn.send_resp(conn, 500, inspect(reason)) + {:DOWN, ^ref, :process, _pid, reason} -> + Plug.Conn.send_resp(conn, 500, inspect(reason)) + end + after + ShapeRequestRegistry.unregister_shape(key) end - after - ShapeRequestRegistry.unregister_shape(key) - end + end) end defp interruptible_call?(params) do diff --git a/lib/phoenix/sync/electric.ex b/lib/phoenix/sync/electric.ex index 02d9f55..032c1ab 100644 --- a/lib/phoenix/sync/electric.ex +++ b/lib/phoenix/sync/electric.ex @@ -572,6 +572,23 @@ defmodule Phoenix.Sync.Electric do ) end end + + @doc false + def api_predefined_shape(conn, api, shape, response_fun) when is_function(response_fun, 2) do + case Phoenix.Sync.Adapter.PlugApi.predefined_shape(api, shape) do + {:ok, shape_api} -> + # response_fun should return conn + response_fun.(conn, shape_api) + + # Only the embedded api will ever return an error from predefined_shape/2 + # when the stack isn't ready (or the params are invalid, e.g. bad table). + # The client adapter just configures the client with the shape + # parameters, which can't error. + {:error, response} -> + conn + |> Plug.Conn.send_resp(response.status, Enum.into(response.body, [])) + end + end end if Code.ensure_loaded?(Electric.Shapes.Api) do diff --git a/lib/phoenix/sync/router.ex b/lib/phoenix/sync/router.ex index d8468db..2a0a7ea 100644 --- a/lib/phoenix/sync/router.ex +++ b/lib/phoenix/sync/router.ex @@ -214,14 +214,14 @@ defmodule Phoenix.Sync.Router do end defp serve_shape(conn, api, shape) do - {:ok, shape_api} = Phoenix.Sync.Adapter.PlugApi.predefined_shape(api, shape) - - conn = - conn - |> Plug.Conn.fetch_query_params() - |> Phoenix.Sync.Plug.CORS.call() - - Phoenix.Sync.Adapter.PlugApi.call(shape_api, conn, conn.params) + Phoenix.Sync.Electric.api_predefined_shape(conn, api, shape, fn conn, shape_api -> + conn = + conn + |> Plug.Conn.fetch_query_params() + |> Phoenix.Sync.Plug.CORS.call() + + Phoenix.Sync.Adapter.PlugApi.call(shape_api, conn, conn.params) + end) end end end diff --git a/lib/phoenix/sync/sandbox/api_adapter.ex b/lib/phoenix/sync/sandbox/api_adapter.ex index 5ca4fee..00d62a2 100644 --- a/lib/phoenix/sync/sandbox/api_adapter.ex +++ b/lib/phoenix/sync/sandbox/api_adapter.ex @@ -20,9 +20,10 @@ if Phoenix.Sync.sandbox_enabled?() do def call(%{shape: shape} = _adapter, conn, params) do shape_api = lookup_api!() - {:ok, shape_api} = PlugApi.predefined_shape(shape_api, shape) - PlugApi.call(shape_api, conn, params) + Phoenix.Sync.Electric.api_predefined_shape(conn, shape_api, shape, fn conn, shape_api -> + PlugApi.call(shape_api, conn, params) + end) end defp lookup_api!() do diff --git a/mix.lock b/mix.lock index 1435cb3..3fc9b55 100644 --- a/mix.lock +++ b/mix.lock @@ -15,7 +15,7 @@ "ecto": {:hex, :ecto, "3.13.2", "7d0c0863f3fc8d71d17fc3ad3b9424beae13f02712ad84191a826c7169484f01", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "669d9291370513ff56e7b7e7081b7af3283d02e046cf3d403053c557894a0b3e"}, "ecto_sql": {:hex, :ecto_sql, "3.13.2", "a07d2461d84107b3d037097c822ffdd36ed69d1cf7c0f70e12a3d1decf04e2e1", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.13.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "539274ab0ecf1a0078a6a72ef3465629e4d6018a3028095dc90f60a19c371717"}, "electric": {:hex, :electric, "1.1.9", "b0a7774556bf306ffe1b4e4ceb270d0e9dfafdd38044b3ab9f98db8e5ead7bb4", [:mix], [{:backoff, "~> 1.1", [hex: :backoff, repo: "hexpm", optional: false]}, {:bandit, "~> 1.6", [hex: :bandit, repo: "hexpm", optional: false]}, {:dotenvy, "~> 1.1", [hex: :dotenvy, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:electric_cubdb, "~> 2.0", [hex: :electric_cubdb, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.5", [hex: :opentelemetry, repo: "hexpm", optional: true]}, {:opentelemetry_exporter, "~> 1.8", [hex: :opentelemetry_exporter, repo: "hexpm", optional: true]}, {:opentelemetry_semantic_conventions, "~> 1.27", [hex: :opentelemetry_semantic_conventions, repo: "hexpm", optional: false]}, {:opentelemetry_telemetry, "~> 1.1", [hex: :opentelemetry_telemetry, repo: "hexpm", optional: false]}, {:otel_metric_exporter, "~> 0.3.11", [hex: :otel_metric_exporter, repo: "hexpm", optional: true]}, {:pg_query_ex, "0.9.0", [hex: :pg_query_ex, repo: "hexpm", optional: false]}, {:plug, "~> 1.17", [hex: :plug, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.20", [hex: :postgrex, repo: "hexpm", optional: false]}, {:remote_ip, "~> 1.2", [hex: :remote_ip, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:retry, "~> 0.19", [hex: :retry, repo: "hexpm", optional: false]}, {:sentry, "~> 11.0", [hex: :sentry, repo: "hexpm", optional: true]}, {:stream_split, "~> 0.1", [hex: :stream_split, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.1", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: true]}, {:telemetry_metrics_statsd, "~> 0.7", [hex: :telemetry_metrics_statsd, repo: "hexpm", optional: true]}, {:telemetry_poller, "~> 1.2", [hex: :telemetry_poller, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.27", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}, {:tz, "~> 0.28", [hex: :tz, repo: "hexpm", optional: false]}], "hexpm", "cd8beea1d005424fd24aa863e85a707e49b770accd23e66066f6c86524643464"}, - "electric_client": {:hex, :electric_client, "0.7.0", "ffe9ba0137e0c8a67f6935adbe42e731bbf1277f06d22cb3ad75c1d18e9647db", [:mix], [{:ecto_sql, "~> 3.12", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:electric, "~> 1.1.1", [hex: :electric, repo: "hexpm", optional: true]}, {:gen_stage, "~> 1.2", [hex: :gen_stage, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "8ad822b150efb28282c4141c57f22008e87c19e066d7ff02eba7e3d276af781d"}, + "electric_client": {:hex, :electric_client, "0.7.2", "06f221fa7379d41ab4fb771c9cf78f26654d7c265f61faffa8c31e6b73073224", [:mix], [{:ecto_sql, "~> 3.12", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:electric, "~> 1.1.1", [hex: :electric, repo: "hexpm", optional: true]}, {:gen_stage, "~> 1.2", [hex: :gen_stage, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "4036796cc21767917f1c1c72541b0865a5585b9b4a59ccdb15b99af9c457c97e"}, "electric_cubdb": {:hex, :electric_cubdb, "2.0.2", "36f86e3c52dc26f4e077a49fbef813b1a38d3897421cece851f149190b34c16c", [:mix], [], "hexpm", "0c0e24b31fb76ad1b33c5de2ab35c41a4ff9da153f5c1f9b15e2de78575acaf2"}, "elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"}, "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},