Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 36 additions & 36 deletions lib/phoenix/sync/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -231,57 +231,57 @@ defmodule Phoenix.Sync.Controller do
end

defp sync_render_call(conn, api, params, predefined_shape) do
{:ok, shape_api} = Phoenix.Sync.Adapter.PlugApi.predefined_shape(api, predefined_shape)

Phoenix.Sync.Adapter.PlugApi.call(shape_api, CORS.call(conn), params)
Phoenix.Sync.Electric.api_predefined_shape(conn, api, predefined_shape, fn conn, shape_api ->
Phoenix.Sync.Adapter.PlugApi.call(shape_api, CORS.call(conn), params)
end)
end

defp interruptible_call(conn, api, params, shape_fun) do
predefined_shape = call_shape_fun(shape_fun)

{:ok, shape_api} = Adapter.PlugApi.predefined_shape(api, predefined_shape)
Phoenix.Sync.Electric.api_predefined_shape(conn, api, predefined_shape, fn conn, shape_api ->
{:ok, key} = ShapeRequestRegistry.register_shape(predefined_shape)

{:ok, key} = ShapeRequestRegistry.register_shape(predefined_shape)
try do
parent = self()
start_time = now()

try do
parent = self()
start_time = now()

{:ok, pid} =
Task.start_link(fn ->
send(parent, {:response, self(), Adapter.PlugApi.call(shape_api, conn, params)})
end)
{:ok, pid} =
Task.start_link(fn ->
send(parent, {:response, self(), Adapter.PlugApi.call(shape_api, conn, params)})
end)

ref = Process.monitor(pid)
ref = Process.monitor(pid)

receive do
{:interrupt_shape, ^key, :server_interrupt} ->
Process.demonitor(ref, [:flush])
Process.unlink(pid)
Process.exit(pid, :kill)
# immediately retry the same request -- if the shape_fun returns a
# different shape the client will receive a must-refetch response but
# if the shape is the same then the request will continue with no
# interruption.
#
# if possible adjust the long poll timeout to account for the time
# already spent before the interrupt.
receive do
{:interrupt_shape, ^key, :server_interrupt} ->
Process.demonitor(ref, [:flush])
Process.unlink(pid)
Process.exit(pid, :kill)
# immediately retry the same request -- if the shape_fun returns a
# different shape the client will receive a must-refetch response but
# if the shape is the same then the request will continue with no
# interruption.
#
# if possible adjust the long poll timeout to account for the time
# already spent before the interrupt.

api = reduce_long_poll_timeout(api, start_time)
api = reduce_long_poll_timeout(api, start_time)

interruptible_call(conn, api, params, shape_fun)
interruptible_call(conn, api, params, shape_fun)

{:response, ^pid, conn} ->
Process.demonitor(ref, [:flush])
{:response, ^pid, conn} ->
Process.demonitor(ref, [:flush])

conn
conn

{:DOWN, ^ref, :process, _pid, reason} ->
Plug.Conn.send_resp(conn, 500, inspect(reason))
{:DOWN, ^ref, :process, _pid, reason} ->
Plug.Conn.send_resp(conn, 500, inspect(reason))
end
after
ShapeRequestRegistry.unregister_shape(key)
end
after
ShapeRequestRegistry.unregister_shape(key)
end
end)
end

defp interruptible_call?(params) do
Expand Down
17 changes: 17 additions & 0 deletions lib/phoenix/sync/electric.ex
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,23 @@ defmodule Phoenix.Sync.Electric do
)
end
end

@doc false
def api_predefined_shape(conn, api, shape, response_fun) when is_function(response_fun, 2) do
case Phoenix.Sync.Adapter.PlugApi.predefined_shape(api, shape) do
{:ok, shape_api} ->
# response_fun should return conn
response_fun.(conn, shape_api)

# Only the embedded api will ever return an error from predefined_shape/2
# when the stack isn't ready (or the params are invalid, e.g. bad table).
# The client adapter just configures the client with the shape
# parameters, which can't error.
{:error, response} ->
conn
|> Plug.Conn.send_resp(response.status, Enum.into(response.body, []))
end
end
end

if Code.ensure_loaded?(Electric.Shapes.Api) do
Expand Down
16 changes: 8 additions & 8 deletions lib/phoenix/sync/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,14 @@ defmodule Phoenix.Sync.Router do
end

defp serve_shape(conn, api, shape) do
{:ok, shape_api} = Phoenix.Sync.Adapter.PlugApi.predefined_shape(api, shape)

conn =
conn
|> Plug.Conn.fetch_query_params()
|> Phoenix.Sync.Plug.CORS.call()

Phoenix.Sync.Adapter.PlugApi.call(shape_api, conn, conn.params)
Phoenix.Sync.Electric.api_predefined_shape(conn, api, shape, fn conn, shape_api ->
conn =
conn
|> Plug.Conn.fetch_query_params()
|> Phoenix.Sync.Plug.CORS.call()

Phoenix.Sync.Adapter.PlugApi.call(shape_api, conn, conn.params)
end)
end
end
end
5 changes: 3 additions & 2 deletions lib/phoenix/sync/sandbox/api_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ if Phoenix.Sync.sandbox_enabled?() do

def call(%{shape: shape} = _adapter, conn, params) do
shape_api = lookup_api!()
{:ok, shape_api} = PlugApi.predefined_shape(shape_api, shape)

PlugApi.call(shape_api, conn, params)
Phoenix.Sync.Electric.api_predefined_shape(conn, shape_api, shape, fn conn, shape_api ->
PlugApi.call(shape_api, conn, params)
end)
end

defp lookup_api!() do
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"ecto": {:hex, :ecto, "3.13.2", "7d0c0863f3fc8d71d17fc3ad3b9424beae13f02712ad84191a826c7169484f01", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "669d9291370513ff56e7b7e7081b7af3283d02e046cf3d403053c557894a0b3e"},
"ecto_sql": {:hex, :ecto_sql, "3.13.2", "a07d2461d84107b3d037097c822ffdd36ed69d1cf7c0f70e12a3d1decf04e2e1", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.13.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "539274ab0ecf1a0078a6a72ef3465629e4d6018a3028095dc90f60a19c371717"},
"electric": {:hex, :electric, "1.1.9", "b0a7774556bf306ffe1b4e4ceb270d0e9dfafdd38044b3ab9f98db8e5ead7bb4", [:mix], [{:backoff, "~> 1.1", [hex: :backoff, repo: "hexpm", optional: false]}, {:bandit, "~> 1.6", [hex: :bandit, repo: "hexpm", optional: false]}, {:dotenvy, "~> 1.1", [hex: :dotenvy, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:electric_cubdb, "~> 2.0", [hex: :electric_cubdb, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.5", [hex: :opentelemetry, repo: "hexpm", optional: true]}, {:opentelemetry_exporter, "~> 1.8", [hex: :opentelemetry_exporter, repo: "hexpm", optional: true]}, {:opentelemetry_semantic_conventions, "~> 1.27", [hex: :opentelemetry_semantic_conventions, repo: "hexpm", optional: false]}, {:opentelemetry_telemetry, "~> 1.1", [hex: :opentelemetry_telemetry, repo: "hexpm", optional: false]}, {:otel_metric_exporter, "~> 0.3.11", [hex: :otel_metric_exporter, repo: "hexpm", optional: true]}, {:pg_query_ex, "0.9.0", [hex: :pg_query_ex, repo: "hexpm", optional: false]}, {:plug, "~> 1.17", [hex: :plug, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.20", [hex: :postgrex, repo: "hexpm", optional: false]}, {:remote_ip, "~> 1.2", [hex: :remote_ip, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:retry, "~> 0.19", [hex: :retry, repo: "hexpm", optional: false]}, {:sentry, "~> 11.0", [hex: :sentry, repo: "hexpm", optional: true]}, {:stream_split, "~> 0.1", [hex: :stream_split, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.1", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: true]}, {:telemetry_metrics_statsd, "~> 0.7", [hex: :telemetry_metrics_statsd, repo: "hexpm", optional: true]}, {:telemetry_poller, "~> 1.2", [hex: :telemetry_poller, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.27", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}, {:tz, "~> 0.28", [hex: :tz, repo: "hexpm", optional: false]}], "hexpm", "cd8beea1d005424fd24aa863e85a707e49b770accd23e66066f6c86524643464"},
"electric_client": {:hex, :electric_client, "0.7.0", "ffe9ba0137e0c8a67f6935adbe42e731bbf1277f06d22cb3ad75c1d18e9647db", [:mix], [{:ecto_sql, "~> 3.12", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:electric, "~> 1.1.1", [hex: :electric, repo: "hexpm", optional: true]}, {:gen_stage, "~> 1.2", [hex: :gen_stage, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "8ad822b150efb28282c4141c57f22008e87c19e066d7ff02eba7e3d276af781d"},
"electric_client": {:hex, :electric_client, "0.7.2", "06f221fa7379d41ab4fb771c9cf78f26654d7c265f61faffa8c31e6b73073224", [:mix], [{:ecto_sql, "~> 3.12", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:electric, "~> 1.1.1", [hex: :electric, repo: "hexpm", optional: true]}, {:gen_stage, "~> 1.2", [hex: :gen_stage, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "4036796cc21767917f1c1c72541b0865a5585b9b4a59ccdb15b99af9c457c97e"},
"electric_cubdb": {:hex, :electric_cubdb, "2.0.2", "36f86e3c52dc26f4e077a49fbef813b1a38d3897421cece851f149190b34c16c", [:mix], [], "hexpm", "0c0e24b31fb76ad1b33c5de2ab35c41a4ff9da153f5c1f9b15e2de78575acaf2"},
"elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"},
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
Expand Down
Loading