Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
alias Realtime.RateCounter
alias Realtime.Tenants

alias RealtimeWeb.TenantBroadcaster

def start_link(opts), do: GenServer.start_link(__MODULE__, opts)

@impl true
def init(args) do
tenant_id = args["id"]
Logger.metadata(external_id: tenant_id, project: tenant_id)

%Realtime.Api.Tenant{} = Tenants.Cache.get_tenant_by_external_id(tenant_id)

rate_counter_args = Tenants.db_events_per_second_rate(tenant_id, 4000)

RateCounter.new(rate_counter_args)
Expand All @@ -50,7 +50,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
retry_count: 0,
slot_name: args["slot_name"] <> slot_name_suffix(),
tenant_id: tenant_id,
rate_counter_args: rate_counter_args
rate_counter_args: rate_counter_args,
subscribers_nodes_table: args["subscribers_nodes_table"]
}

{:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{})
Expand Down Expand Up @@ -84,6 +85,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
max_changes: max_changes,
conn: conn,
tenant_id: tenant_id,
subscribers_nodes_table: subscribers_nodes_table,
rate_counter_args: rate_counter_args
} = state
) do
Expand All @@ -94,7 +96,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
{time, list_changes} = :timer.tc(Replications, :list_changes, args)
record_list_changes_telemetry(time, tenant_id)

case handle_list_changes_result(list_changes, tenant_id, rate_counter_args) do
case handle_list_changes_result(list_changes, subscribers_nodes_table, tenant_id, rate_counter_args) do
{:ok, row_count} ->
Backoff.reset(backoff)

Expand Down Expand Up @@ -187,6 +189,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
rows: [_ | _] = rows,
num_rows: rows_count
}},
subscribers_nodes_table,
tenant_id,
rate_counter_args
) do
Expand All @@ -201,15 +204,45 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
topic = "realtime:postgres:" <> tenant_id

RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
case collect_subscription_nodes(subscribers_nodes_table, change.subscription_ids) do
{:ok, nodes} ->
for node <- nodes do
TenantBroadcaster.pubsub_direct_broadcast(
node,
tenant_id,
topic,
change,
MessageDispatcher,
:postgres_changes
)
end

{:error, :node_not_found} ->
TenantBroadcaster.pubsub_broadcast(
tenant_id,
topic,
change,
MessageDispatcher,
:postgres_changes
)
end
end
end

{:ok, rows_count}
end

defp handle_list_changes_result({:ok, _}, _, _), do: {:ok, 0}
defp handle_list_changes_result({:error, reason}, _, _), do: {:error, reason}
defp handle_list_changes_result({:ok, _}, _, _, _), do: {:ok, 0}
defp handle_list_changes_result({:error, reason}, _, _, _), do: {:error, reason}

defp collect_subscription_nodes(subscribers_nodes_table, subscription_ids) do
Enum.reduce_while(subscription_ids, {:ok, MapSet.new()}, fn subscription_id, {:ok, acc} ->
case :ets.lookup(subscribers_nodes_table, subscription_id) do
[{_, node}] -> {:cont, {:ok, MapSet.put(acc, node)}}
_ -> {:halt, {:error, :node_not_found}}
end
end)
end

def generate_record([
{"wal",
Expand Down
67 changes: 42 additions & 25 deletions lib/extensions/postgres_cdc_rls/subscription_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
defstruct [
:id,
:publication,
:subscribers_tid,
:subscribers_pids_table,
:subscribers_nodes_table,
:conn,
:delete_queue,
:no_users_ref,
Expand All @@ -37,7 +38,8 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
@type t :: %__MODULE__{
id: String.t(),
publication: String.t(),
subscribers_tid: :ets.tid(),
subscribers_pids_table: :ets.tid(),
subscribers_nodes_table: :ets.tid(),
conn: Postgrex.conn(),
oids: map(),
check_oid_ref: reference() | nil,
Expand Down Expand Up @@ -67,7 +69,12 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do

@impl true
def handle_continue({:connect, args}, _) do
%{"id" => id, "publication" => publication, "subscribers_tid" => subscribers_tid} = args
%{
"id" => id,
"publication" => publication,
"subscribers_pids_table" => subscribers_pids_table,
"subscribers_nodes_table" => subscribers_nodes_table
} = args

subscription_manager_settings = Database.from_settings(args, "realtime_subscription_manager")

Expand All @@ -85,31 +92,35 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
check_region_interval = Map.get(args, :check_region_interval, rebalance_check_interval_in_ms())
send_region_check_message(check_region_interval)

state = %State{
id: id,
conn: conn,
publication: publication,
subscribers_tid: subscribers_tid,
oids: oids,
delete_queue: %{
ref: check_delete_queue(),
queue: :queue.new()
},
no_users_ref: check_no_users(),
check_region_interval: check_region_interval
}
state =
%State{
id: id,
conn: conn,
publication: publication,
subscribers_pids_table: subscribers_pids_table,
subscribers_nodes_table: subscribers_nodes_table,
oids: oids,
delete_queue: %{
ref: check_delete_queue(),
queue: :queue.new()
},
no_users_ref: check_no_users(),
check_region_interval: check_region_interval
}

send(self(), :check_oids)
{:noreply, state}
end

@impl true
def handle_info({:subscribed, {pid, id}}, state) do
case :ets.match(state.subscribers_tid, {pid, id, :"$1", :_}) do
[] -> :ets.insert(state.subscribers_tid, {pid, id, Process.monitor(pid), node(pid)})
case :ets.match(state.subscribers_pids_table, {pid, id, :"$1", :_}) do
[] -> :ets.insert(state.subscribers_pids_table, {pid, id, Process.monitor(pid), node(pid)})
_ -> :ok
end

:ets.insert(state.subscribers_nodes_table, {UUID.string_to_binary!(id), node(pid)})

{:noreply, %{state | no_users_ts: nil}}
end

Expand All @@ -132,7 +143,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
Process.demonitor(ref, [:flush])
send(pid, :postgres_subscribe)
end
|> :ets.foldl([], state.subscribers_tid)
|> :ets.foldl([], state.subscribers_pids_table)

new_oids
end
Expand All @@ -142,19 +153,25 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do

def handle_info(
{:DOWN, _ref, :process, pid, _reason},
%State{subscribers_tid: tid, delete_queue: %{queue: q}} = state
%State{
subscribers_pids_table: subscribers_pids_table,
subscribers_nodes_table: subscribers_nodes_table,
delete_queue: %{queue: q}
} = state
) do
q1 =
case :ets.take(tid, pid) do
case :ets.take(subscribers_pids_table, pid) do
[] ->
q

values ->
for {_pid, id, _ref, _node} <- values, reduce: q do
acc ->
id
|> UUID.string_to_binary!()
|> :queue.in(acc)
bin_id = UUID.string_to_binary!(id)

:ets.delete(subscribers_nodes_table, bin_id)

:queue.in(bin_id, acc)
end
end

Expand Down Expand Up @@ -187,7 +204,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
{:noreply, %{state | delete_queue: %{ref: ref, queue: q1}}}
end

def handle_info(:check_no_users, %{subscribers_tid: tid, no_users_ts: ts} = state) do
def handle_info(:check_no_users, %{subscribers_pids_table: tid, no_users_ts: ts} = state) do
Helpers.cancel_timer(state.no_users_ref)

ts_new =
Expand Down
37 changes: 21 additions & 16 deletions lib/extensions/postgres_cdc_rls/subscriptions_checker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do

defmodule State do
@moduledoc false
defstruct [:id, :conn, :check_active_pids, :subscribers_tid, :delete_queue]
defstruct [:id, :conn, :check_active_pids, :subscribers_pids_table, :subscribers_nodes_table, :delete_queue]

@type t :: %__MODULE__{
id: String.t(),
conn: Postgrex.conn(),
check_active_pids: reference(),
subscribers_tid: :ets.tid(),
subscribers_pids_table: :ets.tid(),
subscribers_nodes_table: :ets.tid(),
delete_queue: %{
ref: reference(),
queue: :queue.queue()
Expand All @@ -47,7 +48,11 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do

@impl true
def handle_continue({:connect, args}, _) do
%{"id" => id, "subscribers_tid" => subscribers_tid} = args
%{
"id" => id,
"subscribers_pids_table" => subscribers_pids_table,
"subscribers_nodes_table" => subscribers_nodes_table
} = args

realtime_subscription_checker_settings =
Database.from_settings(args, "realtime_subscription_checker")
Expand All @@ -58,7 +63,8 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
id: id,
conn: conn,
check_active_pids: check_active_pids(),
subscribers_tid: subscribers_tid,
subscribers_pids_table: subscribers_pids_table,
subscribers_nodes_table: subscribers_nodes_table,
delete_queue: %{
ref: nil,
queue: :queue.new()
Expand All @@ -69,18 +75,14 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
end

@impl true
def handle_info(
:check_active_pids,
%State{check_active_pids: ref, subscribers_tid: tid, delete_queue: delete_queue, id: id} =
state
) do
def handle_info(:check_active_pids, %State{check_active_pids: ref, delete_queue: delete_queue, id: id} = state) do
Helpers.cancel_timer(ref)

ids =
tid
state.subscribers_pids_table
|> subscribers_by_node()
|> not_alive_pids_dist()
|> pop_not_alive_pids(tid, id)
|> pop_not_alive_pids(state.subscribers_pids_table, state.subscribers_nodes_table, id)

new_delete_queue =
if length(ids) > 0 do
Expand Down Expand Up @@ -128,10 +130,10 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do

## Internal functions

@spec pop_not_alive_pids([pid()], :ets.tid(), binary()) :: [Ecto.UUID.t()]
def pop_not_alive_pids(pids, tid, tenant_id) do
@spec pop_not_alive_pids([pid()], :ets.tid(), :ets.tid(), binary()) :: [Ecto.UUID.t()]
def pop_not_alive_pids(pids, subscribers_pids_table, subscribers_nodes_table, tenant_id) do
Enum.reduce(pids, [], fn pid, acc ->
case :ets.lookup(tid, pid) do
case :ets.lookup(subscribers_pids_table, pid) do
[] ->
Telemetry.execute(
[:realtime, :subscriptions_checker, :pid_not_found],
Expand All @@ -149,8 +151,11 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
%{tenant_id: tenant_id}
)

:ets.delete(tid, pid)
UUID.string_to_binary!(postgres_id)
:ets.delete(subscribers_pids_table, pid)
bin_id = UUID.string_to_binary!(postgres_id)

:ets.delete(subscribers_nodes_table, bin_id)
bin_id
end ++ acc
end
end)
Expand Down
11 changes: 9 additions & 2 deletions lib/extensions/postgres_cdc_rls/worker_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ defmodule Extensions.PostgresCdcRls.WorkerSupervisor do
Logger.metadata(external_id: tenant, project: tenant)
unless Api.get_tenant_by_external_id(tenant, :primary), do: raise(Exception)

tid_args = Map.merge(args, %{"subscribers_tid" => :ets.new(__MODULE__, [:public, :bag])})
subscribers_pids_table = :ets.new(__MODULE__, [:public, :bag])
subscribers_nodes_table = :ets.new(__MODULE__, [:public, :set])

tid_args =
Map.merge(args, %{
"subscribers_pids_table" => subscribers_pids_table,
"subscribers_nodes_table" => subscribers_nodes_table
})

children = [
%{
id: ReplicationPoller,
start: {ReplicationPoller, :start_link, [args]},
start: {ReplicationPoller, :start_link, [tid_args]},
restart: :transient
},
%{
Expand Down
30 changes: 30 additions & 0 deletions lib/realtime/gen_rpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,36 @@ defmodule Realtime.GenRpc do
:ok
end

@doc """
Fire and forget apply(mod, func, args) on one node

Options:

- `:key` - Optional key to consistently select the same gen_rpc client to guarantee some message order between nodes
"""
@spec cast(node, module, atom, list(any), keyword()) :: :ok
def cast(node, mod, func, args, opts \\ [])

# Local
def cast(node, mod, func, args, _opts) when node == node() do
:erpc.cast(node, mod, func, args)
:ok
end

def cast(node, mod, func, args, opts)
when is_atom(node) and is_atom(mod) and is_atom(func) and is_list(args) and is_list(opts) do
key = Keyword.get(opts, :key, nil)

# Ensure this node is part of the connected nodes
if node in Node.list() do
node_key = rpc_node(node, key)

:gen_rpc.cast(node_key, mod, func, args)
end

:ok
end

@doc """
Fire and forget apply(mod, func, args) on all nodes

Expand Down
Loading