From ed3a6c51042b7a50aba5a3a7c0dd32fbc23dd4e4 Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Wed, 14 Jan 2026 12:47:11 +0300 Subject: [PATCH 01/12] Adds OTEL tracing to ct and public API --- Makefile | 2 +- compose.tracing.yaml | 33 +++++++++++++++++++++++++++ rebar.config | 7 ++++-- rebar.lock | 3 +++ src/prg_scheduler.erl | 2 +- src/prg_worker.erl | 28 +++++++++++++---------- src/progressor.erl | 49 ++++++++++++++++++++++++++++------------- test/prg_base_SUITE.erl | 23 ++++++++++++++++++- test/prg_ct_hook.erl | 4 +++- 9 files changed, 118 insertions(+), 33 deletions(-) create mode 100644 compose.tracing.yaml diff --git a/Makefile b/Makefile index 7345cae..cf97318 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,7 @@ DEV_IMAGE_ID = $(file < .image.dev) DOCKER ?= docker DOCKERCOMPOSE ?= docker compose -DOCKERCOMPOSE_W_ENV = DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f docker-compose.yml +DOCKERCOMPOSE_W_ENV = DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f docker-compose.yml -f compose.tracing.yaml REBAR ?= rebar3 TEST_CONTAINER_NAME ?= testrunner diff --git a/compose.tracing.yaml b/compose.tracing.yaml new file mode 100644 index 0000000..28e99aa --- /dev/null +++ b/compose.tracing.yaml @@ -0,0 +1,33 @@ +services: + + postgres: + environment: &otlp_enabled + OTEL_TRACES_EXPORTER: otlp + OTEL_TRACES_SAMPLER: parentbased_always_off + OTEL_EXPORTER_OTLP_PROTOCOL: http_protobuf + OTEL_EXPORTER_OTLP_ENDPOINT: http://jaeger:4318 + + testrunner: + environment: + <<: *otlp_enabled + OTEL_SERVICE_NAME: progressor_testrunner + OTEL_TRACES_SAMPLER: parentbased_always_on + depends_on: + jaeger: + condition: service_healthy + + jaeger: + image: jaegertracing/all-in-one:1.47 + environment: + - COLLECTOR_OTLP_ENABLED=true + healthcheck: + test: "/go/bin/all-in-one-linux status" + interval: 2s + timeout: 1s + retries: 20 + ports: + - 4317:4317 # OTLP gRPC receiver + - 4318:4318 # OTLP http receiver + - 5778:5778 + - 14250:14250 + - 16686:16686 diff --git a/rebar.config b/rebar.config index e6be635..e718a98 100644 --- a/rebar.config +++ b/rebar.config @@ -5,7 +5,8 @@ {recon, "2.5.6"}, {thrift, {git, "https://github.com/valitydev/thrift_erlang.git", {tag, "v1.0.0"}}}, {mg_proto, {git, "https://github.com/valitydev/machinegun-proto.git", {branch, "master"}}}, - {epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.3"}}} + {epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.3"}}}, + {opentelemetry_api, "1.4.0"} ]}. {xref_checks, [ @@ -38,7 +39,9 @@ {profiles, [ {test, [ {deps, [ - {meck, "0.9.2"} + {meck, "0.9.2"}, + {opentelemetry, "1.5.0"}, + {opentelemetry_exporter, "1.8.0"} ]}, {dialyzer, [{plt_extra_apps, [eunit, common_test, runtime_tools, meck]}]} ]} diff --git a/rebar.lock b/rebar.lock index 004f7ba..a536134 100644 --- a/rebar.lock +++ b/rebar.lock @@ -20,6 +20,7 @@ {git,"https://github.com/valitydev/machinegun-proto.git", {ref,"3decc8f8b13c9cd1701deab47781aacddd7dbc92"}}, 0}, + {<<"opentelemetry_api">>,{pkg,<<"opentelemetry_api">>,<<"1.4.0">>},0}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0}, {<<"quantile_estimator">>,{pkg,<<"quantile_estimator">>,<<"0.2.1">>},1}, {<<"recon">>,{pkg,<<"recon">>,<<"2.5.6">>},0}, @@ -34,6 +35,7 @@ {<<"jsone">>, <<"347FF1FA700E182E1F9C5012FA6D737B12C854313B9AE6954CA75D3987D6C06D">>}, {<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>}, {<<"kafka_protocol">>, <<"F917B6C90C8DF0DE2B40A87D6B9AE1CFCE7788E91A65818E90E40CF76111097A">>}, + {<<"opentelemetry_api">>, <<"63CA1742F92F00059298F478048DFB826F4B20D49534493D6919A0DB39B6DB04">>}, {<<"prometheus">>, <<"B95F8DE8530F541BD95951E18E355A840003672E5EDA4788C5FA6183406BA29A">>}, {<<"quantile_estimator">>, <<"EF50A361F11B5F26B5F16D0696E46A9E4661756492C981F7B2229EF42FF1CD15">>}, {<<"recon">>, <<"9052588E83BFEDFD9B72E1034532AEE2A5369D9D9343B61AEB7FBCE761010741">>}]}, @@ -43,6 +45,7 @@ {<<"jsone">>, <<"08560B78624A12E0B5E7EC0271EC8CA38EF51F63D84D84843473E14D9B12618C">>}, {<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>}, {<<"kafka_protocol">>, <<"DF680A3706EAD8695F8B306897C0A33E8063C690DA9308DB87B462CFD7029D04">>}, + {<<"opentelemetry_api">>, <<"3DFBBFAA2C2ED3121C5C483162836C4F9027DEF469C41578AF5EF32589FCFC58">>}, {<<"prometheus">>, <<"719862351AABF4DF7079B05DC085D2BBCBE3AC0AC3009E956671B1D5AB88247D">>}, {<<"quantile_estimator">>, <<"282A8A323CA2A845C9E6F787D166348F776C1D4A41EDE63046D72D422E3DA946">>}, {<<"recon">>, <<"96C6799792D735CC0F0FD0F86267E9D351E63339CBE03DF9D162010CEFC26BB0">>}]} diff --git a/src/prg_scheduler.erl b/src/prg_scheduler.erl index 87bc45a..629aa31 100644 --- a/src/prg_scheduler.erl +++ b/src/prg_scheduler.erl @@ -233,7 +233,7 @@ do_push_task(TaskHeader, Task, State) -> FreeWorkers = State#prg_scheduler_state.free_workers, case queue:out(FreeWorkers) of {{value, Worker}, NewQueue} -> - ok = prg_worker:process_task(Worker, TaskHeader, Task), + ok = prg_worker:process_task(Worker, TaskHeader, Task, otel_ctx:get_current()), State#prg_scheduler_state{ free_workers = NewQueue }; diff --git a/src/prg_worker.erl b/src/prg_worker.erl index 7484a76..6a9cb57 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -15,7 +15,7 @@ ]). -export([handle_continue/2]). --export([process_task/3]). +-export([process_task/4]). -export([continuation_task/3]). -export([next_task/1]). -export([process_scheduled_task/3]). @@ -29,9 +29,9 @@ %%% API %%% --spec process_task(pid(), task_header(), task()) -> ok. -process_task(Worker, TaskHeader, #{process_id := _ProcessId, task_id := _TaskId} = Task) -> - gen_server:cast(Worker, {process_task, TaskHeader, Task}). +-spec process_task(pid(), task_header(), task(), otel_ctx:t()) -> ok. +process_task(Worker, TaskHeader, #{process_id := _ProcessId, task_id := _TaskId} = Task, OtelCtx) -> + gen_server:cast(Worker, {process_task, TaskHeader, Task, OtelCtx}). -spec continuation_task(pid(), task_header(), task()) -> ok. continuation_task(Worker, TaskHeader, Task) -> @@ -64,7 +64,7 @@ handle_continue(do_start, #prg_worker_state{ns_id = NsId} = State) -> {ok, Pid} = prg_worker_sidecar:start_link(), case prg_scheduler:pop_task(NsId, self()) of {TaskHeader, Task} -> - ok = process_task(self(), TaskHeader, Task); + ok = process_task(self(), TaskHeader, Task, otel_ctx:get_current()); not_found -> skip end, @@ -74,19 +74,23 @@ handle_call(_Request, _From, #prg_worker_state{} = State) -> {reply, ok, State}. handle_cast( - {process_task, TaskHeader, Task}, + {process_task, TaskHeader, Task, OtelCtx}, #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts, process_step_timeout := TimeoutSec} = _NsOpts, sidecar_pid = Pid } = State ) -> - Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, - ProcessId = maps:get(process_id, Task), - HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), - {ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange), - NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), - {noreply, NewState}; + otel_tracer:with_span( + OtelCtx, opentelemetry:get_application_tracer(?MODULE), <<"process">>, #{kind => internal}, fun(SpanCtx) -> + Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, + ProcessId = maps:get(process_id, Task), + HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), + {ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange), + NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), + {noreply, NewState} + end + ); handle_cast( {continuation_task, TaskHeader, Task}, #prg_worker_state{ diff --git a/src/progressor.erl b/src/progressor.erl index 8473f9b..aac568f 100644 --- a/src/progressor.erl +++ b/src/progressor.erl @@ -29,6 +29,7 @@ id := id(), args => term(), idempotency_key => binary(), + otel_ctx => otel_ctx:t(), context => binary(), range => history_range(), options => map(), @@ -45,6 +46,7 @@ reply(Pid, Msg) -> init(Req) -> prg_utils:pipe( [ + fun maybe_add_otel_ctx/1, fun add_ns_opts/1, fun check_idempotency/1, fun add_task/1, @@ -58,6 +60,7 @@ init(Req) -> call(Req) -> prg_utils:pipe( [ + fun maybe_add_otel_ctx/1, fun add_ns_opts/1, fun check_idempotency/1, fun(Data) -> check_process_status(Data, <<"running">>) end, @@ -86,6 +89,7 @@ repair(Req) -> simple_repair(Req) -> prg_utils:pipe( [ + fun maybe_add_otel_ctx/1, fun add_ns_opts/1, fun check_idempotency/1, fun check_process_continuation/1, @@ -99,6 +103,7 @@ simple_repair(Req) -> get(Req) -> prg_utils:pipe( [ + fun maybe_add_otel_ctx/1, fun add_ns_opts/1, fun do_get/1 ], @@ -109,16 +114,18 @@ get(Req) -> put(Req) -> prg_utils:pipe( [ + fun maybe_add_otel_ctx/1, fun add_ns_opts/1, fun do_put/1 ], Req ). --spec trace(request()) -> {ok, _Result} | {error, _Reason}. +-spec trace(request()) -> {ok, _Result :: term()} | {error, _Reason :: term()}. trace(Req) -> prg_utils:pipe( [ + fun maybe_add_otel_ctx/1, fun add_ns_opts/1, fun do_trace/1 ], @@ -165,6 +172,11 @@ cleanup_storage(#{ns := NsId, ns_opts := #{storage := StorageOpts}}) -> %% Internal functions +maybe_add_otel_ctx(#{otel_ctx := _} = Opts) -> + Opts; +maybe_add_otel_ctx(Opts) -> + Opts#{otel_ctx => otel_ctx:get_current()}. + add_ns_opts(#{ns := NsId} = Opts) -> NSs = application:get_env(progressor, namespaces, #{}), case maps:get(NsId, NSs, undefined) of @@ -371,20 +383,27 @@ do_health_check(#{ns := NsId, ns_opts := #{storage := StorageOpts}}) -> {critical, #{progressor_namespace => NsId, error => Detail}} end. -process_call(#{ns_opts := NsOpts, ns := NsId, type := Type, task := Task, worker := Worker}) -> - TimeoutSec = maps:get(process_step_timeout, NsOpts, ?DEFAULT_STEP_TIMEOUT_SEC), - Timeout = TimeoutSec * 1000, - Ref = make_ref(), - TaskHeader = make_task_header(Type, Ref), - ok = prg_worker:process_task(Worker, TaskHeader, Task), - ok = prg_scheduler:release_worker(NsId, self(), Worker), - %% see fun reply/2 - receive - {Ref, Result} -> - Result - after Timeout -> - {error, <<"timeout">>} - end. +process_call(#{ns_opts := NsOpts, ns := NsId, type := Type, task := Task, worker := Worker, otel_ctx := OtelCtx}) -> + otel_tracer:with_span( + OtelCtx, opentelemetry:get_application_tracer(?MODULE), <<"call">>, #{kind => internal}, fun(SpanCtx) -> + TimeoutSec = maps:get(process_step_timeout, NsOpts, ?DEFAULT_STEP_TIMEOUT_SEC), + Timeout = TimeoutSec * 1000, + Ref = make_ref(), + TaskHeader = make_task_header(Type, Ref), + ok = prg_worker:process_task(Worker, TaskHeader, Task, otel_ctx:get_current()), + ok = prg_scheduler:release_worker(NsId, self(), Worker), + %% TODO Maybe refactor to span inside scheduler gen_server + _ = otel_span:add_event(SpanCtx, <<"release worker">>, #{}), + %% see fun reply/2 + receive + {Ref, Result} -> + Result + after Timeout -> + _ = otel_span:record_exception(SpanCtx, throw, <<"timeout">>, [], #{}), + {error, <<"timeout">>} + end + end + ). make_task_header(init, Ref) -> {init, {self(), Ref}}; diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index 15fd707..61147c5 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -8,6 +8,8 @@ end_per_suite/1, init_per_group/2, end_per_group/2, + init_per_testcase/2, + end_per_testcase/2, all/0, groups/0 ]). @@ -61,7 +63,9 @@ init_per_group(tasks_injection, C) -> Applications = [ {epg_connector, prg_ct_hook:app_env(epg_connector)}, {brod, prg_ct_hook:app_env(brod)}, - {progressor, UpdPrgConfig} + {progressor, UpdPrgConfig}, + {opentelemetry_exporter, []}, + {opentelemetry, []} ], _ = prg_ct_hook:start_applications(Applications), _ = prg_ct_hook:create_kafka_topics(), @@ -75,6 +79,23 @@ end_per_group(_, _) -> _ = prg_ct_hook:stop_applications(), ok. +init_per_testcase(Name, C) -> + Mod = ?MODULE, + SpanName = iolist_to_binary([atom_to_binary(Mod), ":", atom_to_binary(Name), "/1"]), + SpanCtx = otel_tracer:start_span(opentelemetry:get_application_tracer(Mod), SpanName, #{kind => internal}), + %% NOTE This also puts otel context to process dictionary + _ = otel_tracer:set_current_span(SpanCtx), + [{span_ctx, SpanCtx} | C]. + +end_per_testcase(_Name, C) -> + case lists:keyfind(span_ctx, 1, C) of + {span_ctx, SpanCtx} -> + _ = otel_span:end_span(SpanCtx), + ok; + _ -> + ok + end. + all() -> [ {group, base}, diff --git a/test/prg_ct_hook.erl b/test/prg_ct_hook.erl index eb3fd38..48a241a 100644 --- a/test/prg_ct_hook.erl +++ b/test/prg_ct_hook.erl @@ -53,7 +53,9 @@ app_list() -> [ {epg_connector, app_env(epg_connector)}, {brod, app_env(brod)}, - {progressor, app_env(progressor)} + {progressor, app_env(progressor)}, + {opentelemetry_exporter, []}, + {opentelemetry, []} ]. app_env(progressor) -> From 080a08725c9b1a80d678b943c58da66d4852a9cd Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Thu, 15 Jan 2026 15:05:51 +0300 Subject: [PATCH 02/12] Wraps all worker calls in `with_span` --- src/prg_scheduler.erl | 24 ++-- src/prg_worker.erl | 125 ++++++++++++------- src/prg_worker_sidecar.erl | 238 +++++++++++++++++++++++++------------ src/progressor.app.src | 3 +- src/progressor.erl | 118 +++++++++++------- 5 files changed, 336 insertions(+), 172 deletions(-) diff --git a/src/prg_scheduler.erl b/src/prg_scheduler.erl index 629aa31..4f2d74e 100644 --- a/src/prg_scheduler.erl +++ b/src/prg_scheduler.erl @@ -40,7 +40,7 @@ push_task(NsId, TaskHeader, Task) -> -spec pop_task(namespace_id(), pid()) -> {task_header(), task()} | not_found. pop_task(NsId, Worker) -> RegName = prg_utils:registered_name(NsId, "_scheduler"), - gen_server:call(RegName, {pop_task, Worker}, infinity). + gen_server:call(RegName, {pop_task, Worker, otel_ctx:get_current()}, infinity). %% Deprecated -spec continuation_task(namespace_id(), pid(), task()) -> {task_header(), task()} | ok. @@ -95,14 +95,18 @@ init({NsId, Opts}) -> }, {ok, State}. -handle_call({pop_task, Worker}, _From, State) -> - case queue:out(State#prg_scheduler_state.ready) of - {{value, TaskData}, NewReady} -> - {reply, TaskData, State#prg_scheduler_state{ready = NewReady}}; - {empty, _} -> - Workers = State#prg_scheduler_state.free_workers, - {reply, not_found, State#prg_scheduler_state{free_workers = queue:in(Worker, Workers)}} - end; +handle_call({pop_task, Worker, OtelCtx}, _From, State) -> + otel_tracer:with_span( + OtelCtx, opentelemetry:get_application_tracer(?MODULE), <<"pop task">>, #{kind => internal}, fun(_SpanCtx) -> + case queue:out(State#prg_scheduler_state.ready) of + {{value, TaskData}, NewReady} -> + {reply, {TaskData, otel_ctx:get_current()}, State#prg_scheduler_state{ready = NewReady}}; + {empty, _} -> + Workers = State#prg_scheduler_state.free_workers, + {reply, not_found, State#prg_scheduler_state{free_workers = queue:in(Worker, Workers)}} + end + end + ); handle_call(count_workers, _From, #prg_scheduler_state{free_workers = Workers} = State) -> {reply, queue:len(Workers), State}; handle_call( @@ -233,7 +237,7 @@ do_push_task(TaskHeader, Task, State) -> FreeWorkers = State#prg_scheduler_state.free_workers, case queue:out(FreeWorkers) of {{value, Worker}, NewQueue} -> - ok = prg_worker:process_task(Worker, TaskHeader, Task, otel_ctx:get_current()), + ok = prg_worker:process_task(Worker, TaskHeader, Task), State#prg_scheduler_state{ free_workers = NewQueue }; diff --git a/src/prg_worker.erl b/src/prg_worker.erl index 6a9cb57..081bb66 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -15,7 +15,7 @@ ]). -export([handle_continue/2]). --export([process_task/4]). +-export([process_task/3]). -export([continuation_task/3]). -export([next_task/1]). -export([process_scheduled_task/3]). @@ -29,21 +29,22 @@ %%% API %%% --spec process_task(pid(), task_header(), task(), otel_ctx:t()) -> ok. -process_task(Worker, TaskHeader, #{process_id := _ProcessId, task_id := _TaskId} = Task, OtelCtx) -> - gen_server:cast(Worker, {process_task, TaskHeader, Task, OtelCtx}). +-spec process_task(pid(), task_header(), task()) -> ok. +process_task(Worker, TaskHeader, #{process_id := _ProcessId, task_id := _TaskId} = Task) -> + gen_server:cast(Worker, {process_task, TaskHeader, Task, otel_ctx:get_current()}). -spec continuation_task(pid(), task_header(), task()) -> ok. continuation_task(Worker, TaskHeader, Task) -> - gen_server:cast(Worker, {continuation_task, TaskHeader, Task}). + gen_server:cast(Worker, {continuation_task, TaskHeader, Task, otel_ctx:get_current()}). -spec next_task(pid()) -> ok. next_task(Worker) -> + _ = otel_span:add_event(otel_tracer:current_span_ctx(otel_ctx:get_current()), <<"next task">>, #{}), gen_server:cast(Worker, next_task). -spec process_scheduled_task(pid(), id(), task_id()) -> ok. process_scheduled_task(Worker, ProcessId, TaskId) -> - gen_server:cast(Worker, {process_scheduled_task, ProcessId, TaskId}). + gen_server:cast(Worker, {process_scheduled_task, ProcessId, TaskId, otel_ctx:get_current()}). %%%=================================================================== %%% Spawning and gen_server implementation @@ -61,14 +62,23 @@ init([NsId, NsOpts]) -> {continue, do_start}}. handle_continue(do_start, #prg_worker_state{ns_id = NsId} = State) -> - {ok, Pid} = prg_worker_sidecar:start_link(), - case prg_scheduler:pop_task(NsId, self()) of - {TaskHeader, Task} -> - ok = process_task(self(), TaskHeader, Task, otel_ctx:get_current()); - not_found -> - skip - end, - {noreply, State#prg_worker_state{sidecar_pid = Pid}}. + otel_tracer:with_span( + otel_ctx:get_current(), + opentelemetry:get_application_tracer(?MODULE), + <<"do start">>, + #{kind => internal}, + fun(SpanCtx) -> + {ok, Pid} = prg_worker_sidecar:start_link(), + case prg_scheduler:pop_task(NsId, self()) of + {TaskHeader, Task} -> + ok = process_task(self(), TaskHeader, Task); + not_found -> + _ = otel_span:add_event(SpanCtx, <<"no task">>, #{}), + skip + end, + {noreply, State#prg_worker_state{sidecar_pid = Pid}} + end + ). handle_call(_Request, _From, #prg_worker_state{} = State) -> {reply, ok, State}. @@ -82,7 +92,11 @@ handle_cast( } = State ) -> otel_tracer:with_span( - OtelCtx, opentelemetry:get_application_tracer(?MODULE), <<"process">>, #{kind => internal}, fun(SpanCtx) -> + OtelCtx, + opentelemetry:get_application_tracer(?MODULE), + <<"process task">>, + #{kind => internal}, + fun(_SpanCtx) -> Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, ProcessId = maps:get(process_id, Task), HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), @@ -92,40 +106,58 @@ handle_cast( end ); handle_cast( - {continuation_task, TaskHeader, Task}, + {continuation_task, TaskHeader, Task, OtelCtx}, #prg_worker_state{ ns_opts = #{process_step_timeout := TimeoutSec} } = State ) -> - Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, - NewState = do_process_task(TaskHeader, Task, Deadline, State), - {noreply, NewState}; + otel_tracer:with_span( + OtelCtx, + opentelemetry:get_application_tracer(?MODULE), + <<"process continuation">>, + #{kind => internal}, + fun(_SpanCtx) -> + Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, + NewState = do_process_task(TaskHeader, Task, Deadline, State), + {noreply, NewState} + end + ); handle_cast( - {process_scheduled_task, ProcessId, TaskId}, + {process_scheduled_task, ProcessId, TaskId, OtelCtx}, #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts, process_step_timeout := TimeoutSec} = _NsOpts, sidecar_pid = Pid } = State ) -> - try prg_storage:capture_task(StorageOpts, NsId, TaskId) of - [] -> - %% task cancelled, blocked, already running or finished - ok = next_task(self()), - {noreply, State}; - [#{status := <<"running">>} = Task] -> - Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, - HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), - {ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange), - TaskHeader = create_header(Task), - NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), - {noreply, NewState} - catch - Class:Term:Stacktrace -> - logger:error("process ~p. task capturing exception: ~p", [ProcessId, [Class, Term, Stacktrace]]), - ok = next_task(self()), - {noreply, State} - end; + otel_tracer:with_span( + OtelCtx, + opentelemetry:get_application_tracer(?MODULE), + <<"process scheduled task">>, + #{kind => internal}, + fun(_SpanCtx) -> + try prg_storage:capture_task(StorageOpts, NsId, TaskId) of + [] -> + %% task cancelled, blocked, already running or finished + ok = next_task(self()), + {noreply, State}; + [#{status := <<"running">>} = Task] -> + Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, + HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), + {ok, Process} = prg_worker_sidecar:get_process( + Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange + ), + TaskHeader = create_header(Task), + NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), + {noreply, NewState} + catch + Class:Term:Stacktrace -> + logger:error("process ~p. task capturing exception: ~p", [ProcessId, [Class, Term, Stacktrace]]), + ok = next_task(self()), + {noreply, State} + end + end + ); handle_cast(next_task, #prg_worker_state{sidecar_pid = CurrentPid}) -> %% kill sidecar and restart to clear memory true = erlang:unlink(CurrentPid), @@ -157,10 +189,18 @@ do_process_task( sidecar_pid = Pid } = State ) -> - ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, remove, ProcessId), - ok = prg_worker_sidecar:remove_process(Pid, Deadline, StorageOpts, NsId, ProcessId), - ok = next_task(self()), - State#prg_worker_state{process = undefined}; + otel_tracer:with_span( + otel_ctx:get_current(), + opentelemetry:get_application_tracer(?MODULE), + <<"remove process">>, + #{kind => internal}, + fun(_SpanCtx) -> + ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, remove, ProcessId), + ok = prg_worker_sidecar:remove_process(Pid, Deadline, StorageOpts, NsId, ProcessId), + ok = next_task(self()), + State#prg_worker_state{process = undefined} + end + ); do_process_task( TaskHeader, Task, @@ -382,6 +422,7 @@ success_and_unlock( last_retry_interval => 0, attempts_count => 0 }, + %% FIXME Otel must drop trace here - right before moving to other tasks {ok, [ContinuationTask | _]} = prg_worker_sidecar:complete_and_continue( Pid, Deadline, diff --git a/src/prg_worker_sidecar.erl b/src/prg_worker_sidecar.erl index a67ba4d..ccf8fc7 100644 --- a/src/prg_worker_sidecar.erl +++ b/src/prg_worker_sidecar.erl @@ -51,7 +51,7 @@ process(Pid, Deadline, #{namespace := NS} = NsOpts, {TaskType, _, _} = Request, Context) -> Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> - gen_server:call(Pid, {process, NsOpts, Request, Context}, Timeout) + gen_server:call(Pid, {process, NsOpts, Request, Context, otel_ctx:get_current()}, Timeout) end, prg_utils:with_observe(Fun, ?PROCESSING_KEY, [NS, erlang:atom_to_list(TaskType)]). @@ -71,7 +71,8 @@ complete_and_continue(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpda Fun = fun() -> gen_server:call( Pid, - {complete_and_continue, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, Task}, + {complete_and_continue, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, Task, + otel_ctx:get_current()}, infinity ) end, @@ -93,7 +94,7 @@ complete_and_suspend(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpdat Fun = fun() -> gen_server:call( Pid, - {complete_and_suspend, StorageOpts, NsId, TaskResult, ProcessUpdates, Events}, + {complete_and_suspend, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, otel_ctx:get_current()}, infinity ) end, @@ -113,7 +114,7 @@ complete_and_unlock(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpdate Fun = fun() -> gen_server:call( Pid, - {complete_and_unlock, StorageOpts, NsId, TaskResult, ProcessUpdates, Events}, + {complete_and_unlock, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, otel_ctx:get_current()}, infinity ) end, @@ -128,7 +129,7 @@ complete_and_error(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpdates Fun = fun() -> gen_server:call( Pid, - {complete_and_error, StorageOpts, NsId, TaskResult, ProcessUpdates}, + {complete_and_error, StorageOpts, NsId, TaskResult, ProcessUpdates, otel_ctx:get_current()}, infinity ) end, @@ -139,7 +140,7 @@ complete_and_error(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpdates remove_process(Pid, _Deadline, StorageOpts, NsId, ProcessId) -> %% Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> - gen_server:call(Pid, {remove_process, StorageOpts, NsId, ProcessId}, infinity) + gen_server:call(Pid, {remove_process, StorageOpts, NsId, ProcessId, otel_ctx:get_current()}, infinity) end, prg_utils:with_observe(Fun, ?REMOVING_KEY, [erlang:atom_to_list(NsId)]). @@ -149,7 +150,7 @@ remove_process(Pid, _Deadline, StorageOpts, NsId, ProcessId) -> event_sink(Pid, Deadline, #{namespace := NS} = NsOpts, ProcessId, Events) -> Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> - gen_server:call(Pid, {event_sink, NsOpts, ProcessId, Events}, Timeout) + gen_server:call(Pid, {event_sink, NsOpts, ProcessId, Events, otel_ctx:get_current()}, Timeout) end, prg_utils:with_observe(Fun, ?NOTIFICATION_KEY, [NS, "event_sink"]). @@ -158,7 +159,7 @@ event_sink(Pid, Deadline, #{namespace := NS} = NsOpts, ProcessId, Events) -> lifecycle_sink(Pid, Deadline, #{namespace := NS} = NsOpts, TaskType, ProcessId) -> Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> - gen_server:call(Pid, {lifecycle_sink, NsOpts, TaskType, ProcessId}, Timeout) + gen_server:call(Pid, {lifecycle_sink, NsOpts, TaskType, ProcessId, otel_ctx:get_current()}, Timeout) end, prg_utils:with_observe(Fun, ?NOTIFICATION_KEY, [NS, "lifecycle_sink"]). %% @@ -167,19 +168,19 @@ lifecycle_sink(Pid, Deadline, #{namespace := NS} = NsOpts, TaskType, ProcessId) {ok, process()} | {error, _Reason}. get_process(Pid, _Deadline, StorageOpts, NsId, ProcessId) -> %% Timeout = Deadline - erlang:system_time(millisecond), - gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, #{}}, infinity). + gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, #{}, otel_ctx:get_current()}, infinity). -spec get_process(pid(), timestamp_ms(), storage_opts(), namespace_id(), id(), history_range()) -> {ok, process()} | {error, _Reason}. get_process(Pid, _Deadline, StorageOpts, NsId, ProcessId, HistoryRange) -> %% Timeout = Deadline - erlang:system_time(millisecond), - gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, HistoryRange}, infinity). + gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, HistoryRange, otel_ctx:get_current()}, infinity). -spec get_task(pid(), timestamp_ms(), storage_opts(), namespace_id(), task_id()) -> {ok, task()} | {error, _Reason}. get_task(Pid, _Deadline, StorageOpts, NsId, TaskId) -> %% Timeout = Deadline - erlang:system_time(millisecond), - gen_server:call(Pid, {get_task, StorageOpts, NsId, TaskId}, infinity). + gen_server:call(Pid, {get_task, StorageOpts, NsId, TaskId, otel_ctx:get_current()}, infinity). %%%=================================================================== %%% Spawning and gen_server implementation @@ -196,105 +197,180 @@ handle_call( process, #{processor := #{client := Handler, options := Options}, namespace := _NsName} = _NsOpts, Request, - Ctx + Ctx, + OtelCtx }, _From, #prg_sidecar_state{} = State ) -> - Response = - try Handler:process(Request, Options, Ctx) of - {ok, _Result} = OK -> - OK; - {error, Reason} = ERR -> - logger:error("processor error: ~p", [Reason]), - ERR; - Unsupported -> - logger:error("processor unexpected result: ~p", [Unsupported]), - {error, <<"unsupported_result">>} - catch - Class:Term:Trace -> - logger:error("processor exception: ~p", [[Class, Term, Trace]]), - {error, {exception, Class, Term}} - end, - {reply, Response, State}; + otel_tracer:with_span( + OtelCtx, opentelemetry:get_application_tracer(?MODULE), <<"process">>, #{kind => internal}, fun( + _SpanCtx + ) -> + Response = + try Handler:process(Request, Options, Ctx) of + {ok, _Result} = OK -> + OK; + {error, Reason} = ERR -> + logger:error("processor error: ~p", [Reason]), + ERR; + Unsupported -> + logger:error("processor unexpected result: ~p", [Unsupported]), + {error, <<"unsupported_result">>} + catch + Class:Term:Trace -> + logger:error("processor exception: ~p", [[Class, Term, Trace]]), + {error, {exception, Class, Term}} + end, + {reply, Response, State} + end + ); handle_call( - {complete_and_continue, StorageOpts, NsId, TaskResult, Process, Events, Task}, + {complete_and_continue, StorageOpts, NsId, TaskResult, Process, Events, Task, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - Fun = fun() -> - prg_storage:complete_and_continue(StorageOpts, NsId, TaskResult, Process, Events, Task) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State}; + otel_tracer:with_span( + OtelCtx, + opentelemetry:get_application_tracer(?MODULE), + <<"complete and continue">>, + #{kind => internal}, + fun(SpanCtx) -> + Fun = fun() -> + prg_storage:complete_and_continue(StorageOpts, NsId, TaskResult, Process, Events, Task) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), + {reply, Response, State} + end + ); handle_call( - {remove_process, StorageOpts, NsId, ProcessId}, + {remove_process, StorageOpts, NsId, ProcessId, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - Fun = fun() -> - prg_storage:remove_process(StorageOpts, NsId, ProcessId) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State}; + otel_tracer:with_span( + OtelCtx, + opentelemetry:get_application_tracer(?MODULE), + <<"remove process">>, + #{kind => internal}, + fun(SpanCtx) -> + Fun = fun() -> + prg_storage:remove_process(StorageOpts, NsId, ProcessId) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), + {reply, Response, State} + end + ); handle_call( - {get_process, StorageOpts, NsId, ProcessId, HistoryRange}, + {get_process, StorageOpts, NsId, ProcessId, HistoryRange, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - Fun = fun() -> - prg_storage:get_process(internal, StorageOpts, NsId, ProcessId, HistoryRange) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State}; + otel_tracer:with_span( + OtelCtx, opentelemetry:get_application_tracer(?MODULE), <<"get process">>, #{kind => internal}, fun(SpanCtx) -> + Fun = fun() -> + prg_storage:get_process(internal, StorageOpts, NsId, ProcessId, HistoryRange) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), + {reply, Response, State} + end + ); handle_call( - {get_task, StorageOpts, NsId, TaskId}, + {get_task, StorageOpts, NsId, TaskId, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - Fun = fun() -> - prg_storage:get_task(StorageOpts, NsId, TaskId) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State}; + otel_tracer:with_span( + OtelCtx, + opentelemetry:get_application_tracer(?MODULE), + <<"get task">>, + #{kind => internal}, + fun(SpanCtx) -> + Fun = fun() -> + prg_storage:get_task(StorageOpts, NsId, TaskId) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), + {reply, Response, State} + end + ); handle_call( - {complete_and_suspend, StorageOpts, NsId, TaskResult, Process, Events}, + {complete_and_suspend, StorageOpts, NsId, TaskResult, Process, Events, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - Fun = fun() -> - prg_storage:complete_and_suspend(StorageOpts, NsId, TaskResult, Process, Events) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State}; + otel_tracer:with_span( + OtelCtx, + opentelemetry:get_application_tracer(?MODULE), + <<"complete and suspend">>, + #{kind => internal}, + fun(SpanCtx) -> + Fun = fun() -> + prg_storage:complete_and_suspend(StorageOpts, NsId, TaskResult, Process, Events) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), + {reply, Response, State} + end + ); handle_call( - {complete_and_unlock, StorageOpts, NsId, TaskResult, Process, Events}, + {complete_and_unlock, StorageOpts, NsId, TaskResult, Process, Events, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - Fun = fun() -> - prg_storage:complete_and_unlock(StorageOpts, NsId, TaskResult, Process, Events) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State}; + otel_tracer:with_span( + OtelCtx, + opentelemetry:get_application_tracer(?MODULE), + <<"complete and unlock">>, + #{kind => internal}, + fun(SpanCtx) -> + Fun = fun() -> + prg_storage:complete_and_unlock(StorageOpts, NsId, TaskResult, Process, Events) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), + {reply, Response, State} + end + ); handle_call( - {complete_and_error, StorageOpts, NsId, TaskResult, Process}, + {complete_and_error, StorageOpts, NsId, TaskResult, Process, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - Fun = fun() -> - prg_storage:complete_and_error(StorageOpts, NsId, TaskResult, Process) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State}; -handle_call({event_sink, NsOpts, ProcessId, Events}, _From, State) -> - Fun = fun() -> prg_notifier:event_sink(NsOpts, ProcessId, Events) end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State}; -handle_call({lifecycle_sink, NsOpts, TaskType, ProcessId}, _From, State) -> - Fun = fun() -> prg_notifier:lifecycle_sink(NsOpts, TaskType, ProcessId) end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State}. + otel_tracer:with_span( + OtelCtx, + opentelemetry:get_application_tracer(?MODULE), + <<"complete and error">>, + #{kind => internal}, + fun(SpanCtx) -> + Fun = fun() -> + prg_storage:complete_and_error(StorageOpts, NsId, TaskResult, Process) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), + {reply, Response, State} + end + ); +handle_call({event_sink, NsOpts, ProcessId, Events, OtelCtx}, _From, State) -> + otel_tracer:with_span( + OtelCtx, + opentelemetry:get_application_tracer(?MODULE), + <<"event sink">>, + #{kind => internal}, + fun(SpanCtx) -> + Fun = fun() -> prg_notifier:event_sink(NsOpts, ProcessId, Events) end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), + {reply, Response, State} + end + ); +handle_call({lifecycle_sink, NsOpts, TaskType, ProcessId, OtelCtx}, _From, State) -> + otel_tracer:with_span( + OtelCtx, + opentelemetry:get_application_tracer(?MODULE), + <<"lifecycle sink">>, + #{kind => internal}, + fun(SpanCtx) -> + Fun = fun() -> prg_notifier:lifecycle_sink(NsOpts, TaskType, ProcessId) end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), + {reply, Response, State} + end + ). handle_cast(_Request, #prg_sidecar_state{} = State) -> {noreply, State}. @@ -313,6 +389,10 @@ code_change(_OldVsn, #prg_sidecar_state{} = State, _Extra) -> %%%=================================================================== do_with_retry(Fun, Delay) -> + do_with_retry(Fun, Delay, undefined). + +do_with_retry(Fun, Delay, SpanCtx) -> + _ = otel_span:add_event(SpanCtx, <<"try">>, #{}), try Fun() of ok = Result -> Result; @@ -320,11 +400,13 @@ do_with_retry(Fun, Delay) -> Result; Error -> _ = logger:error("result processing error: ~p", [Error]), + _ = otel_span:add_event(SpanCtx, <<"retryable error">>, #{}), timer:sleep(Delay), do_with_retry(Fun, Delay) catch Class:Error:Trace -> _ = logger:error("result processing exception: ~p", [[Class, Error, Trace]]), + _ = otel_span:record_exception(SpanCtx, Class, Error, Trace, #{}), timer:sleep(Delay), do_with_retry(Fun, Delay) end. diff --git a/src/progressor.app.src b/src/progressor.app.src index d713868..be762b3 100644 --- a/src/progressor.app.src +++ b/src/progressor.app.src @@ -11,7 +11,8 @@ epg_connector, thrift, mg_proto, - brod + brod, + opentelemetry_api ]}, {env, []}, {modules, []}, diff --git a/src/progressor.erl b/src/progressor.erl index aac568f..4226628 100644 --- a/src/progressor.erl +++ b/src/progressor.erl @@ -75,6 +75,7 @@ call(Req) -> repair(Req) -> prg_utils:pipe( [ + fun maybe_add_otel_ctx/1, fun add_ns_opts/1, fun check_idempotency/1, fun(Data) -> check_process_status(Data, <<"error">>) end, @@ -238,10 +239,18 @@ check_process_continuation( do_simple_repair(#{task := _}) -> %% process will repaired via timeout task {ok, ok}; -do_simple_repair(#{ns_opts := #{storage := StorageOpts} = NsOpts, id := Id, ns := NsId}) -> - ok = prg_storage:repair_process(StorageOpts, NsId, Id), - ok = prg_notifier:lifecycle_sink(NsOpts, repair, Id), - {ok, ok}. +do_simple_repair(#{ns_opts := #{storage := StorageOpts} = NsOpts, id := Id, ns := NsId, otel_ctx := OtelCtx}) -> + otel_tracer:with_span( + OtelCtx, + opentelemetry:get_application_tracer(?MODULE), + <<"simple repair">>, + #{kind => internal}, + fun(_SpanCtx) -> + ok = prg_storage:repair_process(StorageOpts, NsId, Id), + ok = prg_notifier:lifecycle_sink(NsOpts, repair, Id), + {ok, ok} + end + ). prepare( Fun, @@ -322,53 +331,80 @@ await_task_result(StorageOpts, NsId, KeyOrId, StepTimeout, Duration) -> ) end. -do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, range := HistoryRange} = Req) -> - case prg_storage:get_process_with_initialization(StorageOpts, NsId, Id, HistoryRange) of - {ok, #{initialization := _TaskId}} -> - %% init task not finished, await and retry - Timeout = application:get_env(progressor, task_repeat_request_timeout, ?TASK_REPEAT_REQUEST_TIMEOUT), - timer:sleep(Timeout), - do_get(Req); - Result -> - Result - end; +do_get( + #{ + ns_opts := #{storage := StorageOpts}, + id := Id, + ns := NsId, + range := HistoryRange, + otel_ctx := OtelCtx + } = Req +) -> + otel_tracer:with_span( + OtelCtx, + opentelemetry:get_application_tracer(?MODULE), + <<"get">>, + #{kind => internal}, + fun(_SpanCtx) -> + case prg_storage:get_process_with_initialization(StorageOpts, NsId, Id, HistoryRange) of + {ok, #{initialization := _TaskId}} -> + %% init task not finished, await and retry + Timeout = application:get_env( + progressor, task_repeat_request_timeout, ?TASK_REPEAT_REQUEST_TIMEOUT + ), + timer:sleep(Timeout), + do_get(Req); + Result -> + Result + end + end + ); do_get(Req) -> do_get(Req#{range => #{}}). -do_trace(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId}) -> - prg_storage:process_trace(StorageOpts, NsId, Id). +do_trace(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, otel_ctx := OtelCtx}) -> + otel_tracer:with_span( + OtelCtx, opentelemetry:get_application_tracer(?MODULE), <<"trace">>, #{kind => internal}, fun(_SpanCtx) -> + prg_storage:process_trace(StorageOpts, NsId, Id) + end + ). do_put( #{ ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, - args := #{process := Process} = Args + args := #{process := Process} = Args, + otel_ctx := OtelCtx } = Opts ) -> - #{ - process_id := ProcessId - } = Process, - Action = maps:get(action, Args, undefined), - Context = maps:get(context, Opts, <<>>), - Now = erlang:system_time(second), - InitTask = #{ - process_id => ProcessId, - task_type => <<"init">>, - status => <<"finished">>, - args => <<>>, - context => Context, - response => term_to_binary({ok, ok}), - scheduled_time => Now, - running_time => Now, - finished_time => Now, - last_retry_interval => 0, - attempts_count => 0 - }, - ActiveTask = action_to_task(Action, ProcessId, Context), - ProcessData0 = #{process => Process, init_task => InitTask}, - ProcessData = maybe_add_key(ActiveTask, active_task, ProcessData0), - prg_storage:put_process_data(StorageOpts, NsId, Id, ProcessData). + otel_tracer:with_span( + OtelCtx, opentelemetry:get_application_tracer(?MODULE), <<"put">>, #{kind => internal}, fun(_SpanCtx) -> + #{ + process_id := ProcessId + } = Process, + Action = maps:get(action, Args, undefined), + Context = maps:get(context, Opts, <<>>), + Now = erlang:system_time(second), + InitTask = #{ + process_id => ProcessId, + task_type => <<"init">>, + status => <<"finished">>, + args => <<>>, + context => Context, + response => term_to_binary({ok, ok}), + scheduled_time => Now, + running_time => Now, + finished_time => Now, + last_retry_interval => 0, + attempts_count => 0 + }, + ActiveTask = action_to_task(Action, ProcessId, Context), + ProcessData0 = #{process => Process, init_task => InitTask}, + ProcessData = maybe_add_key(ActiveTask, active_task, ProcessData0), + prg_storage:put_process_data(StorageOpts, NsId, Id, ProcessData) + end + ). do_health_check(#{ns := NsId, ns_opts := #{storage := StorageOpts}}) -> try prg_storage:health_check(StorageOpts) of @@ -390,7 +426,7 @@ process_call(#{ns_opts := NsOpts, ns := NsId, type := Type, task := Task, worker Timeout = TimeoutSec * 1000, Ref = make_ref(), TaskHeader = make_task_header(Type, Ref), - ok = prg_worker:process_task(Worker, TaskHeader, Task, otel_ctx:get_current()), + ok = prg_worker:process_task(Worker, TaskHeader, Task), ok = prg_scheduler:release_worker(NsId, self(), Worker), %% TODO Maybe refactor to span inside scheduler gen_server _ = otel_span:add_event(SpanCtx, <<"release worker">>, #{}), From 3ccee4a379870ea1279693df85bd611dddf6c7dd Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Fri, 16 Jan 2026 18:18:14 +0300 Subject: [PATCH 03/12] Instruments progressor with OTEL tracing --- include/otel.hrl | 21 ++++ src/prg_scheduler.erl | 21 ++-- src/prg_worker.erl | 141 +++++++++------------- src/prg_worker_sidecar.erl | 241 ++++++++++++++----------------------- src/progressor.erl | 145 ++++++++++------------ test/prg_base_SUITE.erl | 2 +- test/prg_ct_hook.erl | 2 +- 7 files changed, 245 insertions(+), 328 deletions(-) create mode 100644 include/otel.hrl diff --git a/include/otel.hrl b/include/otel.hrl new file mode 100644 index 0000000..b54478c --- /dev/null +++ b/include/otel.hrl @@ -0,0 +1,21 @@ +-ifndef(__progressor_otel__). +-define(__progressor_otel__, ok). + +-define(current_otel_ctx, otel_ctx:get_current()). + +-define(current_span_ctx, otel_tracer:current_span_ctx(?current_otel_ctx)). + +-define(span_exception(Class, Error, Stacktrace), + otel_span:record_exception(?current_span_ctx, Class, Error, Stacktrace, #{}) +). + +-define(span_event(EventName), otel_span:add_event(?current_span_ctx, EventName, #{})). + +-define(tracer, opentelemetry:get_application_tracer(?MODULE)). + +-define(with_span(OtelCtx, SpanName, Fun), + otel_tracer:with_span(OtelCtx, ?tracer, SpanName, #{kind => internal}, fun(_SpanCtx) -> Fun() end) +). +-define(with_span(SpanName, Fun), ?with_span(?current_otel_ctx, SpanName, Fun)). + +-endif. diff --git a/src/prg_scheduler.erl b/src/prg_scheduler.erl index 4f2d74e..425090b 100644 --- a/src/prg_scheduler.erl +++ b/src/prg_scheduler.erl @@ -1,6 +1,7 @@ -module(prg_scheduler). -include("progressor.hrl"). +-include("otel.hrl"). -behaviour(gen_server). @@ -40,7 +41,7 @@ push_task(NsId, TaskHeader, Task) -> -spec pop_task(namespace_id(), pid()) -> {task_header(), task()} | not_found. pop_task(NsId, Worker) -> RegName = prg_utils:registered_name(NsId, "_scheduler"), - gen_server:call(RegName, {pop_task, Worker, otel_ctx:get_current()}, infinity). + gen_server:call(RegName, {pop_task, Worker, ?current_otel_ctx}, infinity). %% Deprecated -spec continuation_task(namespace_id(), pid(), task()) -> {task_header(), task()} | ok. @@ -96,17 +97,15 @@ init({NsId, Opts}) -> {ok, State}. handle_call({pop_task, Worker, OtelCtx}, _From, State) -> - otel_tracer:with_span( - OtelCtx, opentelemetry:get_application_tracer(?MODULE), <<"pop task">>, #{kind => internal}, fun(_SpanCtx) -> - case queue:out(State#prg_scheduler_state.ready) of - {{value, TaskData}, NewReady} -> - {reply, {TaskData, otel_ctx:get_current()}, State#prg_scheduler_state{ready = NewReady}}; - {empty, _} -> - Workers = State#prg_scheduler_state.free_workers, - {reply, not_found, State#prg_scheduler_state{free_workers = queue:in(Worker, Workers)}} - end + ?with_span(OtelCtx, <<"pop task">>, fun() -> + case queue:out(State#prg_scheduler_state.ready) of + {{value, TaskData}, NewReady} -> + {reply, TaskData, State#prg_scheduler_state{ready = NewReady}}; + {empty, _} -> + Workers = State#prg_scheduler_state.free_workers, + {reply, not_found, State#prg_scheduler_state{free_workers = queue:in(Worker, Workers)}} end - ); + end); handle_call(count_workers, _From, #prg_scheduler_state{free_workers = Workers} = State) -> {reply, queue:len(Workers), State}; handle_call( diff --git a/src/prg_worker.erl b/src/prg_worker.erl index 081bb66..6511755 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -3,6 +3,7 @@ -behaviour(gen_server). -include("progressor.hrl"). +-include("otel.hrl"). -export([start_link/2]). -export([ @@ -31,20 +32,20 @@ -spec process_task(pid(), task_header(), task()) -> ok. process_task(Worker, TaskHeader, #{process_id := _ProcessId, task_id := _TaskId} = Task) -> - gen_server:cast(Worker, {process_task, TaskHeader, Task, otel_ctx:get_current()}). + gen_server:cast(Worker, {process_task, TaskHeader, Task, ?current_otel_ctx}). -spec continuation_task(pid(), task_header(), task()) -> ok. continuation_task(Worker, TaskHeader, Task) -> - gen_server:cast(Worker, {continuation_task, TaskHeader, Task, otel_ctx:get_current()}). + gen_server:cast(Worker, {continuation_task, TaskHeader, Task, ?current_otel_ctx}). -spec next_task(pid()) -> ok. next_task(Worker) -> - _ = otel_span:add_event(otel_tracer:current_span_ctx(otel_ctx:get_current()), <<"next task">>, #{}), + _ = ?span_event(<<"next task">>), gen_server:cast(Worker, next_task). -spec process_scheduled_task(pid(), id(), task_id()) -> ok. process_scheduled_task(Worker, ProcessId, TaskId) -> - gen_server:cast(Worker, {process_scheduled_task, ProcessId, TaskId, otel_ctx:get_current()}). + gen_server:cast(Worker, {process_scheduled_task, ProcessId, TaskId, ?current_otel_ctx}). %%%=================================================================== %%% Spawning and gen_server implementation @@ -62,23 +63,17 @@ init([NsId, NsOpts]) -> {continue, do_start}}. handle_continue(do_start, #prg_worker_state{ns_id = NsId} = State) -> - otel_tracer:with_span( - otel_ctx:get_current(), - opentelemetry:get_application_tracer(?MODULE), - <<"do start">>, - #{kind => internal}, - fun(SpanCtx) -> - {ok, Pid} = prg_worker_sidecar:start_link(), - case prg_scheduler:pop_task(NsId, self()) of - {TaskHeader, Task} -> - ok = process_task(self(), TaskHeader, Task); - not_found -> - _ = otel_span:add_event(SpanCtx, <<"no task">>, #{}), - skip - end, - {noreply, State#prg_worker_state{sidecar_pid = Pid}} - end - ). + ?with_span(<<"start">>, fun() -> + {ok, Pid} = prg_worker_sidecar:start_link(), + case prg_scheduler:pop_task(NsId, self()) of + {TaskHeader, Task} -> + ok = process_task(self(), TaskHeader, Task); + not_found -> + _ = ?span_event(<<"no task">>), + skip + end, + {noreply, State#prg_worker_state{sidecar_pid = Pid}} + end). handle_call(_Request, _From, #prg_worker_state{} = State) -> {reply, ok, State}. @@ -91,37 +86,25 @@ handle_cast( sidecar_pid = Pid } = State ) -> - otel_tracer:with_span( - OtelCtx, - opentelemetry:get_application_tracer(?MODULE), - <<"process task">>, - #{kind => internal}, - fun(_SpanCtx) -> - Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, - ProcessId = maps:get(process_id, Task), - HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), - {ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange), - NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), - {noreply, NewState} - end - ); + ?with_span(OtelCtx, <<"process task">>, fun() -> + Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, + ProcessId = maps:get(process_id, Task), + HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), + {ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange), + NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), + {noreply, NewState} + end); handle_cast( {continuation_task, TaskHeader, Task, OtelCtx}, #prg_worker_state{ ns_opts = #{process_step_timeout := TimeoutSec} } = State ) -> - otel_tracer:with_span( - OtelCtx, - opentelemetry:get_application_tracer(?MODULE), - <<"process continuation">>, - #{kind => internal}, - fun(_SpanCtx) -> - Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, - NewState = do_process_task(TaskHeader, Task, Deadline, State), - {noreply, NewState} - end - ); + ?with_span(OtelCtx, <<"process continuation">>, fun() -> + Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, + NewState = do_process_task(TaskHeader, Task, Deadline, State), + {noreply, NewState} + end); handle_cast( {process_scheduled_task, ProcessId, TaskId, OtelCtx}, #prg_worker_state{ @@ -130,34 +113,28 @@ handle_cast( sidecar_pid = Pid } = State ) -> - otel_tracer:with_span( - OtelCtx, - opentelemetry:get_application_tracer(?MODULE), - <<"process scheduled task">>, - #{kind => internal}, - fun(_SpanCtx) -> - try prg_storage:capture_task(StorageOpts, NsId, TaskId) of - [] -> - %% task cancelled, blocked, already running or finished - ok = next_task(self()), - {noreply, State}; - [#{status := <<"running">>} = Task] -> - Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, - HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), - {ok, Process} = prg_worker_sidecar:get_process( - Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange - ), - TaskHeader = create_header(Task), - NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), - {noreply, NewState} - catch - Class:Term:Stacktrace -> - logger:error("process ~p. task capturing exception: ~p", [ProcessId, [Class, Term, Stacktrace]]), - ok = next_task(self()), - {noreply, State} - end + ?with_span(OtelCtx, <<"process scheduled task">>, fun() -> + try prg_storage:capture_task(StorageOpts, NsId, TaskId) of + [] -> + %% task cancelled, blocked, already running or finished + ok = next_task(self()), + {noreply, State}; + [#{status := <<"running">>} = Task] -> + Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, + HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), + {ok, Process} = prg_worker_sidecar:get_process( + Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange + ), + TaskHeader = create_header(Task), + NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), + {noreply, NewState} + catch + Class:Term:Stacktrace -> + logger:error("process ~p. task capturing exception: ~p", [ProcessId, [Class, Term, Stacktrace]]), + ok = next_task(self()), + {noreply, State} end - ); + end); handle_cast(next_task, #prg_worker_state{sidecar_pid = CurrentPid}) -> %% kill sidecar and restart to clear memory true = erlang:unlink(CurrentPid), @@ -189,18 +166,12 @@ do_process_task( sidecar_pid = Pid } = State ) -> - otel_tracer:with_span( - otel_ctx:get_current(), - opentelemetry:get_application_tracer(?MODULE), - <<"remove process">>, - #{kind => internal}, - fun(_SpanCtx) -> - ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, remove, ProcessId), - ok = prg_worker_sidecar:remove_process(Pid, Deadline, StorageOpts, NsId, ProcessId), - ok = next_task(self()), - State#prg_worker_state{process = undefined} - end - ); + ?with_span(<<"remove process">>, fun() -> + ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, remove, ProcessId), + ok = prg_worker_sidecar:remove_process(Pid, Deadline, StorageOpts, NsId, ProcessId), + ok = next_task(self()), + State#prg_worker_state{process = undefined} + end); do_process_task( TaskHeader, Task, diff --git a/src/prg_worker_sidecar.erl b/src/prg_worker_sidecar.erl index ccf8fc7..59e029c 100644 --- a/src/prg_worker_sidecar.erl +++ b/src/prg_worker_sidecar.erl @@ -3,6 +3,7 @@ -behaviour(gen_server). -include("progressor.hrl"). +-include("otel.hrl"). -export([start_link/0]). -export([ @@ -51,7 +52,7 @@ process(Pid, Deadline, #{namespace := NS} = NsOpts, {TaskType, _, _} = Request, Context) -> Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> - gen_server:call(Pid, {process, NsOpts, Request, Context, otel_ctx:get_current()}, Timeout) + gen_server:call(Pid, {process, NsOpts, Request, Context, ?current_otel_ctx}, Timeout) end, prg_utils:with_observe(Fun, ?PROCESSING_KEY, [NS, erlang:atom_to_list(TaskType)]). @@ -71,8 +72,7 @@ complete_and_continue(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpda Fun = fun() -> gen_server:call( Pid, - {complete_and_continue, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, Task, - otel_ctx:get_current()}, + {complete_and_continue, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, Task, ?current_otel_ctx}, infinity ) end, @@ -94,7 +94,7 @@ complete_and_suspend(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpdat Fun = fun() -> gen_server:call( Pid, - {complete_and_suspend, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, otel_ctx:get_current()}, + {complete_and_suspend, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, ?current_otel_ctx}, infinity ) end, @@ -114,7 +114,7 @@ complete_and_unlock(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpdate Fun = fun() -> gen_server:call( Pid, - {complete_and_unlock, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, otel_ctx:get_current()}, + {complete_and_unlock, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, ?current_otel_ctx}, infinity ) end, @@ -129,7 +129,7 @@ complete_and_error(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpdates Fun = fun() -> gen_server:call( Pid, - {complete_and_error, StorageOpts, NsId, TaskResult, ProcessUpdates, otel_ctx:get_current()}, + {complete_and_error, StorageOpts, NsId, TaskResult, ProcessUpdates, ?current_otel_ctx}, infinity ) end, @@ -140,7 +140,7 @@ complete_and_error(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpdates remove_process(Pid, _Deadline, StorageOpts, NsId, ProcessId) -> %% Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> - gen_server:call(Pid, {remove_process, StorageOpts, NsId, ProcessId, otel_ctx:get_current()}, infinity) + gen_server:call(Pid, {remove_process, StorageOpts, NsId, ProcessId, ?current_otel_ctx}, infinity) end, prg_utils:with_observe(Fun, ?REMOVING_KEY, [erlang:atom_to_list(NsId)]). @@ -150,7 +150,7 @@ remove_process(Pid, _Deadline, StorageOpts, NsId, ProcessId) -> event_sink(Pid, Deadline, #{namespace := NS} = NsOpts, ProcessId, Events) -> Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> - gen_server:call(Pid, {event_sink, NsOpts, ProcessId, Events, otel_ctx:get_current()}, Timeout) + gen_server:call(Pid, {event_sink, NsOpts, ProcessId, Events, ?current_otel_ctx}, Timeout) end, prg_utils:with_observe(Fun, ?NOTIFICATION_KEY, [NS, "event_sink"]). @@ -159,7 +159,7 @@ event_sink(Pid, Deadline, #{namespace := NS} = NsOpts, ProcessId, Events) -> lifecycle_sink(Pid, Deadline, #{namespace := NS} = NsOpts, TaskType, ProcessId) -> Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> - gen_server:call(Pid, {lifecycle_sink, NsOpts, TaskType, ProcessId, otel_ctx:get_current()}, Timeout) + gen_server:call(Pid, {lifecycle_sink, NsOpts, TaskType, ProcessId, ?current_otel_ctx}, Timeout) end, prg_utils:with_observe(Fun, ?NOTIFICATION_KEY, [NS, "lifecycle_sink"]). %% @@ -168,19 +168,19 @@ lifecycle_sink(Pid, Deadline, #{namespace := NS} = NsOpts, TaskType, ProcessId) {ok, process()} | {error, _Reason}. get_process(Pid, _Deadline, StorageOpts, NsId, ProcessId) -> %% Timeout = Deadline - erlang:system_time(millisecond), - gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, #{}, otel_ctx:get_current()}, infinity). + gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, #{}, ?current_otel_ctx}, infinity). -spec get_process(pid(), timestamp_ms(), storage_opts(), namespace_id(), id(), history_range()) -> {ok, process()} | {error, _Reason}. get_process(Pid, _Deadline, StorageOpts, NsId, ProcessId, HistoryRange) -> %% Timeout = Deadline - erlang:system_time(millisecond), - gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, HistoryRange, otel_ctx:get_current()}, infinity). + gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, HistoryRange, ?current_otel_ctx}, infinity). -spec get_task(pid(), timestamp_ms(), storage_opts(), namespace_id(), task_id()) -> {ok, task()} | {error, _Reason}. get_task(Pid, _Deadline, StorageOpts, NsId, TaskId) -> %% Timeout = Deadline - erlang:system_time(millisecond), - gen_server:call(Pid, {get_task, StorageOpts, NsId, TaskId, otel_ctx:get_current()}, infinity). + gen_server:call(Pid, {get_task, StorageOpts, NsId, TaskId, ?current_otel_ctx}, infinity). %%%=================================================================== %%% Spawning and gen_server implementation @@ -203,174 +203,120 @@ handle_call( _From, #prg_sidecar_state{} = State ) -> - otel_tracer:with_span( - OtelCtx, opentelemetry:get_application_tracer(?MODULE), <<"process">>, #{kind => internal}, fun( - _SpanCtx - ) -> - Response = - try Handler:process(Request, Options, Ctx) of - {ok, _Result} = OK -> - OK; - {error, Reason} = ERR -> - logger:error("processor error: ~p", [Reason]), - ERR; - Unsupported -> - logger:error("processor unexpected result: ~p", [Unsupported]), - {error, <<"unsupported_result">>} - catch - Class:Term:Trace -> - logger:error("processor exception: ~p", [[Class, Term, Trace]]), - {error, {exception, Class, Term}} - end, - {reply, Response, State} - end - ); + ?with_span(OtelCtx, <<"process">>, fun() -> + Response = + try Handler:process(Request, Options, Ctx) of + {ok, _Result} = OK -> + OK; + {error, Reason} = ERR -> + logger:error("processor error: ~p", [Reason]), + ERR; + Unsupported -> + logger:error("processor unexpected result: ~p", [Unsupported]), + {error, <<"unsupported_result">>} + catch + Class:Term:Trace -> + logger:error("processor exception: ~p", [[Class, Term, Trace]]), + {error, {exception, Class, Term}} + end, + {reply, Response, State} + end); handle_call( {complete_and_continue, StorageOpts, NsId, TaskResult, Process, Events, Task, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - otel_tracer:with_span( - OtelCtx, - opentelemetry:get_application_tracer(?MODULE), - <<"complete and continue">>, - #{kind => internal}, - fun(SpanCtx) -> - Fun = fun() -> - prg_storage:complete_and_continue(StorageOpts, NsId, TaskResult, Process, Events, Task) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), - {reply, Response, State} - end - ); + ?with_span(OtelCtx, <<"complete and continue">>, fun() -> + Fun = fun() -> + prg_storage:complete_and_continue(StorageOpts, NsId, TaskResult, Process, Events, Task) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State} + end); handle_call( {remove_process, StorageOpts, NsId, ProcessId, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - otel_tracer:with_span( - OtelCtx, - opentelemetry:get_application_tracer(?MODULE), - <<"remove process">>, - #{kind => internal}, - fun(SpanCtx) -> - Fun = fun() -> - prg_storage:remove_process(StorageOpts, NsId, ProcessId) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), - {reply, Response, State} - end - ); + ?with_span(OtelCtx, <<"remove_process">>, fun() -> + Fun = fun() -> + prg_storage:remove_process(StorageOpts, NsId, ProcessId) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State} + end); handle_call( {get_process, StorageOpts, NsId, ProcessId, HistoryRange, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - otel_tracer:with_span( - OtelCtx, opentelemetry:get_application_tracer(?MODULE), <<"get process">>, #{kind => internal}, fun(SpanCtx) -> - Fun = fun() -> - prg_storage:get_process(internal, StorageOpts, NsId, ProcessId, HistoryRange) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), - {reply, Response, State} - end - ); + ?with_span(OtelCtx, <<"get process">>, fun() -> + Fun = fun() -> + prg_storage:get_process(internal, StorageOpts, NsId, ProcessId, HistoryRange) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State} + end); handle_call( {get_task, StorageOpts, NsId, TaskId, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - otel_tracer:with_span( - OtelCtx, - opentelemetry:get_application_tracer(?MODULE), - <<"get task">>, - #{kind => internal}, - fun(SpanCtx) -> - Fun = fun() -> - prg_storage:get_task(StorageOpts, NsId, TaskId) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), - {reply, Response, State} - end - ); + ?with_span(OtelCtx, <<"get task">>, fun() -> + Fun = fun() -> + prg_storage:get_task(StorageOpts, NsId, TaskId) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State} + end); handle_call( {complete_and_suspend, StorageOpts, NsId, TaskResult, Process, Events, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - otel_tracer:with_span( - OtelCtx, - opentelemetry:get_application_tracer(?MODULE), - <<"complete and suspend">>, - #{kind => internal}, - fun(SpanCtx) -> - Fun = fun() -> - prg_storage:complete_and_suspend(StorageOpts, NsId, TaskResult, Process, Events) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), - {reply, Response, State} - end - ); + ?with_span(OtelCtx, <<"complete and suspend">>, fun() -> + Fun = fun() -> + prg_storage:complete_and_suspend(StorageOpts, NsId, TaskResult, Process, Events) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State} + end); handle_call( {complete_and_unlock, StorageOpts, NsId, TaskResult, Process, Events, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - otel_tracer:with_span( - OtelCtx, - opentelemetry:get_application_tracer(?MODULE), - <<"complete and unlock">>, - #{kind => internal}, - fun(SpanCtx) -> - Fun = fun() -> - prg_storage:complete_and_unlock(StorageOpts, NsId, TaskResult, Process, Events) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), - {reply, Response, State} - end - ); + ?with_span(OtelCtx, <<"complete and unlock">>, fun() -> + Fun = fun() -> + prg_storage:complete_and_unlock(StorageOpts, NsId, TaskResult, Process, Events) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State} + end); handle_call( {complete_and_error, StorageOpts, NsId, TaskResult, Process, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - otel_tracer:with_span( - OtelCtx, - opentelemetry:get_application_tracer(?MODULE), - <<"complete and error">>, - #{kind => internal}, - fun(SpanCtx) -> - Fun = fun() -> - prg_storage:complete_and_error(StorageOpts, NsId, TaskResult, Process) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), - {reply, Response, State} - end - ); + ?with_span(OtelCtx, <<"complete and error">>, fun() -> + Fun = fun() -> + prg_storage:complete_and_error(StorageOpts, NsId, TaskResult, Process) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State} + end); handle_call({event_sink, NsOpts, ProcessId, Events, OtelCtx}, _From, State) -> - otel_tracer:with_span( - OtelCtx, - opentelemetry:get_application_tracer(?MODULE), - <<"event sink">>, - #{kind => internal}, - fun(SpanCtx) -> - Fun = fun() -> prg_notifier:event_sink(NsOpts, ProcessId, Events) end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), - {reply, Response, State} - end - ); + ?with_span(OtelCtx, <<"event sink">>, fun() -> + Fun = fun() -> prg_notifier:event_sink(NsOpts, ProcessId, Events) end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State} + end); handle_call({lifecycle_sink, NsOpts, TaskType, ProcessId, OtelCtx}, _From, State) -> - otel_tracer:with_span( - OtelCtx, - opentelemetry:get_application_tracer(?MODULE), - <<"lifecycle sink">>, - #{kind => internal}, - fun(SpanCtx) -> - Fun = fun() -> prg_notifier:lifecycle_sink(NsOpts, TaskType, ProcessId) end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY, SpanCtx), - {reply, Response, State} - end - ). + ?with_span(OtelCtx, <<"lifecycle sink">>, fun() -> + Fun = fun() -> prg_notifier:lifecycle_sink(NsOpts, TaskType, ProcessId) end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State} + end). handle_cast(_Request, #prg_sidecar_state{} = State) -> {noreply, State}. @@ -389,10 +335,7 @@ code_change(_OldVsn, #prg_sidecar_state{} = State, _Extra) -> %%%=================================================================== do_with_retry(Fun, Delay) -> - do_with_retry(Fun, Delay, undefined). - -do_with_retry(Fun, Delay, SpanCtx) -> - _ = otel_span:add_event(SpanCtx, <<"try">>, #{}), + _ = ?span_event(<<"try">>), try Fun() of ok = Result -> Result; @@ -400,13 +343,13 @@ do_with_retry(Fun, Delay, SpanCtx) -> Result; Error -> _ = logger:error("result processing error: ~p", [Error]), - _ = otel_span:add_event(SpanCtx, <<"retryable error">>, #{}), + _ = ?span_event(<<"retryable error">>), timer:sleep(Delay), do_with_retry(Fun, Delay) catch Class:Error:Trace -> _ = logger:error("result processing exception: ~p", [[Class, Error, Trace]]), - _ = otel_span:record_exception(SpanCtx, Class, Error, Trace, #{}), + _ = ?span_exception(Class, Error, Trace), timer:sleep(Delay), do_with_retry(Fun, Delay) end. diff --git a/src/progressor.erl b/src/progressor.erl index 4226628..ca1230e 100644 --- a/src/progressor.erl +++ b/src/progressor.erl @@ -1,6 +1,7 @@ -module(progressor). -include("progressor.hrl"). +-include("otel.hrl"). -define(TASK_REPEAT_REQUEST_TIMEOUT, 1000). -define(PREPARING_KEY, progressor_request_preparing_duration_ms). @@ -176,7 +177,7 @@ cleanup_storage(#{ns := NsId, ns_opts := #{storage := StorageOpts}}) -> maybe_add_otel_ctx(#{otel_ctx := _} = Opts) -> Opts; maybe_add_otel_ctx(Opts) -> - Opts#{otel_ctx => otel_ctx:get_current()}. + Opts#{otel_ctx => ?current_otel_ctx}. add_ns_opts(#{ns := NsId} = Opts) -> NSs = application:get_env(progressor, namespaces, #{}), @@ -240,17 +241,11 @@ do_simple_repair(#{task := _}) -> %% process will repaired via timeout task {ok, ok}; do_simple_repair(#{ns_opts := #{storage := StorageOpts} = NsOpts, id := Id, ns := NsId, otel_ctx := OtelCtx}) -> - otel_tracer:with_span( - OtelCtx, - opentelemetry:get_application_tracer(?MODULE), - <<"simple repair">>, - #{kind => internal}, - fun(_SpanCtx) -> - ok = prg_storage:repair_process(StorageOpts, NsId, Id), - ok = prg_notifier:lifecycle_sink(NsOpts, repair, Id), - {ok, ok} - end - ). + ?with_span(OtelCtx, <<"simple repair">>, fun() -> + ok = prg_storage:repair_process(StorageOpts, NsId, Id), + ok = prg_notifier:lifecycle_sink(NsOpts, repair, Id), + {ok, ok} + end). prepare( Fun, @@ -340,34 +335,26 @@ do_get( otel_ctx := OtelCtx } = Req ) -> - otel_tracer:with_span( - OtelCtx, - opentelemetry:get_application_tracer(?MODULE), - <<"get">>, - #{kind => internal}, - fun(_SpanCtx) -> - case prg_storage:get_process_with_initialization(StorageOpts, NsId, Id, HistoryRange) of - {ok, #{initialization := _TaskId}} -> - %% init task not finished, await and retry - Timeout = application:get_env( - progressor, task_repeat_request_timeout, ?TASK_REPEAT_REQUEST_TIMEOUT - ), - timer:sleep(Timeout), - do_get(Req); - Result -> - Result - end + ?with_span(OtelCtx, <<"get">>, fun() -> + case prg_storage:get_process_with_initialization(StorageOpts, NsId, Id, HistoryRange) of + {ok, #{initialization := _TaskId}} -> + %% init task not finished, await and retry + Timeout = application:get_env( + progressor, task_repeat_request_timeout, ?TASK_REPEAT_REQUEST_TIMEOUT + ), + timer:sleep(Timeout), + do_get(Req); + Result -> + Result end - ); + end); do_get(Req) -> do_get(Req#{range => #{}}). do_trace(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, otel_ctx := OtelCtx}) -> - otel_tracer:with_span( - OtelCtx, opentelemetry:get_application_tracer(?MODULE), <<"trace">>, #{kind => internal}, fun(_SpanCtx) -> - prg_storage:process_trace(StorageOpts, NsId, Id) - end - ). + ?with_span(OtelCtx, <<"trace">>, fun() -> + prg_storage:process_trace(StorageOpts, NsId, Id) + end). do_put( #{ @@ -378,33 +365,31 @@ do_put( otel_ctx := OtelCtx } = Opts ) -> - otel_tracer:with_span( - OtelCtx, opentelemetry:get_application_tracer(?MODULE), <<"put">>, #{kind => internal}, fun(_SpanCtx) -> - #{ - process_id := ProcessId - } = Process, - Action = maps:get(action, Args, undefined), - Context = maps:get(context, Opts, <<>>), - Now = erlang:system_time(second), - InitTask = #{ - process_id => ProcessId, - task_type => <<"init">>, - status => <<"finished">>, - args => <<>>, - context => Context, - response => term_to_binary({ok, ok}), - scheduled_time => Now, - running_time => Now, - finished_time => Now, - last_retry_interval => 0, - attempts_count => 0 - }, - ActiveTask = action_to_task(Action, ProcessId, Context), - ProcessData0 = #{process => Process, init_task => InitTask}, - ProcessData = maybe_add_key(ActiveTask, active_task, ProcessData0), - prg_storage:put_process_data(StorageOpts, NsId, Id, ProcessData) - end - ). + ?with_span(OtelCtx, <<"put">>, fun() -> + #{ + process_id := ProcessId + } = Process, + Action = maps:get(action, Args, undefined), + Context = maps:get(context, Opts, <<>>), + Now = erlang:system_time(second), + InitTask = #{ + process_id => ProcessId, + task_type => <<"init">>, + status => <<"finished">>, + args => <<>>, + context => Context, + response => term_to_binary({ok, ok}), + scheduled_time => Now, + running_time => Now, + finished_time => Now, + last_retry_interval => 0, + attempts_count => 0 + }, + ActiveTask = action_to_task(Action, ProcessId, Context), + ProcessData0 = #{process => Process, init_task => InitTask}, + ProcessData = maybe_add_key(ActiveTask, active_task, ProcessData0), + prg_storage:put_process_data(StorageOpts, NsId, Id, ProcessData) + end). do_health_check(#{ns := NsId, ns_opts := #{storage := StorageOpts}}) -> try prg_storage:health_check(StorageOpts) of @@ -420,26 +405,24 @@ do_health_check(#{ns := NsId, ns_opts := #{storage := StorageOpts}}) -> end. process_call(#{ns_opts := NsOpts, ns := NsId, type := Type, task := Task, worker := Worker, otel_ctx := OtelCtx}) -> - otel_tracer:with_span( - OtelCtx, opentelemetry:get_application_tracer(?MODULE), <<"call">>, #{kind => internal}, fun(SpanCtx) -> - TimeoutSec = maps:get(process_step_timeout, NsOpts, ?DEFAULT_STEP_TIMEOUT_SEC), - Timeout = TimeoutSec * 1000, - Ref = make_ref(), - TaskHeader = make_task_header(Type, Ref), - ok = prg_worker:process_task(Worker, TaskHeader, Task), - ok = prg_scheduler:release_worker(NsId, self(), Worker), - %% TODO Maybe refactor to span inside scheduler gen_server - _ = otel_span:add_event(SpanCtx, <<"release worker">>, #{}), - %% see fun reply/2 - receive - {Ref, Result} -> - Result - after Timeout -> - _ = otel_span:record_exception(SpanCtx, throw, <<"timeout">>, [], #{}), - {error, <<"timeout">>} - end + ?with_span(OtelCtx, <<"call">>, fun() -> + TimeoutSec = maps:get(process_step_timeout, NsOpts, ?DEFAULT_STEP_TIMEOUT_SEC), + Timeout = TimeoutSec * 1000, + Ref = make_ref(), + TaskHeader = make_task_header(Type, Ref), + ok = prg_worker:process_task(Worker, TaskHeader, Task), + ok = prg_scheduler:release_worker(NsId, self(), Worker), + %% TODO Maybe refactor to span inside scheduler gen_server + _ = ?span_event(<<"release worker">>), + %% see fun reply/2 + receive + {Ref, Result} -> + Result + after Timeout -> + _ = ?span_exception(throw, <<"timeout">>, []), + {error, <<"timeout">>} end - ). + end). make_task_header(init, Ref) -> {init, {self(), Ref}}; diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index 61147c5..f992a66 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -65,7 +65,7 @@ init_per_group(tasks_injection, C) -> {brod, prg_ct_hook:app_env(brod)}, {progressor, UpdPrgConfig}, {opentelemetry_exporter, []}, - {opentelemetry, []} + {opentelemetry, [{span_processor, simple}]} ], _ = prg_ct_hook:start_applications(Applications), _ = prg_ct_hook:create_kafka_topics(), diff --git a/test/prg_ct_hook.erl b/test/prg_ct_hook.erl index 48a241a..b458c67 100644 --- a/test/prg_ct_hook.erl +++ b/test/prg_ct_hook.erl @@ -55,7 +55,7 @@ app_list() -> {brod, app_env(brod)}, {progressor, app_env(progressor)}, {opentelemetry_exporter, []}, - {opentelemetry, []} + {opentelemetry, [{span_processor, simple}]} ]. app_env(progressor) -> From dbd7758b7d7cd538fca0395a7bec4ff23e67bc65 Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Mon, 19 Jan 2026 17:44:07 +0300 Subject: [PATCH 04/12] Adds span attributes w/ process/task ids --- include/otel.hrl | 5 +++++ src/prg_scheduler.erl | 21 +++++++++------------ src/prg_worker.erl | 19 +++++++++++++++++-- src/prg_worker_sidecar.erl | 2 ++ 4 files changed, 33 insertions(+), 14 deletions(-) diff --git a/include/otel.hrl b/include/otel.hrl index b54478c..b5f4748 100644 --- a/include/otel.hrl +++ b/include/otel.hrl @@ -8,9 +8,14 @@ -define(span_exception(Class, Error, Stacktrace), otel_span:record_exception(?current_span_ctx, Class, Error, Stacktrace, #{}) ). +-define(span_exception(Class, Error, Message, Stacktrace), + otel_span:record_exception(?current_span_ctx, Class, Error, Message, Stacktrace, #{}) +). -define(span_event(EventName), otel_span:add_event(?current_span_ctx, EventName, #{})). +-define(span_attributes(Attributes), otel_span:set_attributes(?current_span_ctx, Attributes)). + -define(tracer, opentelemetry:get_application_tracer(?MODULE)). -define(with_span(OtelCtx, SpanName, Fun), diff --git a/src/prg_scheduler.erl b/src/prg_scheduler.erl index 425090b..87bc45a 100644 --- a/src/prg_scheduler.erl +++ b/src/prg_scheduler.erl @@ -1,7 +1,6 @@ -module(prg_scheduler). -include("progressor.hrl"). --include("otel.hrl"). -behaviour(gen_server). @@ -41,7 +40,7 @@ push_task(NsId, TaskHeader, Task) -> -spec pop_task(namespace_id(), pid()) -> {task_header(), task()} | not_found. pop_task(NsId, Worker) -> RegName = prg_utils:registered_name(NsId, "_scheduler"), - gen_server:call(RegName, {pop_task, Worker, ?current_otel_ctx}, infinity). + gen_server:call(RegName, {pop_task, Worker}, infinity). %% Deprecated -spec continuation_task(namespace_id(), pid(), task()) -> {task_header(), task()} | ok. @@ -96,16 +95,14 @@ init({NsId, Opts}) -> }, {ok, State}. -handle_call({pop_task, Worker, OtelCtx}, _From, State) -> - ?with_span(OtelCtx, <<"pop task">>, fun() -> - case queue:out(State#prg_scheduler_state.ready) of - {{value, TaskData}, NewReady} -> - {reply, TaskData, State#prg_scheduler_state{ready = NewReady}}; - {empty, _} -> - Workers = State#prg_scheduler_state.free_workers, - {reply, not_found, State#prg_scheduler_state{free_workers = queue:in(Worker, Workers)}} - end - end); +handle_call({pop_task, Worker}, _From, State) -> + case queue:out(State#prg_scheduler_state.ready) of + {{value, TaskData}, NewReady} -> + {reply, TaskData, State#prg_scheduler_state{ready = NewReady}}; + {empty, _} -> + Workers = State#prg_scheduler_state.free_workers, + {reply, not_found, State#prg_scheduler_state{free_workers = queue:in(Worker, Workers)}} + end; handle_call(count_workers, _From, #prg_scheduler_state{free_workers = Workers} = State) -> {reply, queue:len(Workers), State}; handle_call( diff --git a/src/prg_worker.erl b/src/prg_worker.erl index 6511755..8091f9f 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -63,7 +63,8 @@ init([NsId, NsOpts]) -> {continue, do_start}}. handle_continue(do_start, #prg_worker_state{ns_id = NsId} = State) -> - ?with_span(<<"start">>, fun() -> + %% FIXME Worker w/o OTEL context, since it is not passed to init w/ `start_child` + ?with_span(<<"do start">>, fun() -> {ok, Pid} = prg_worker_sidecar:start_link(), case prg_scheduler:pop_task(NsId, self()) of {TaskHeader, Task} -> @@ -87,8 +88,14 @@ handle_cast( } = State ) -> ?with_span(OtelCtx, <<"process task">>, fun() -> - Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, ProcessId = maps:get(process_id, Task), + ?span_attributes(#{ + <<"progressor.process.type">> => atom_to_binary(element(1, TaskHeader)), + <<"progressor.process.id">> => ProcessId, + <<"progressor.process.namespace">> => NsId, + <<"progressor.process.task_id">> => maps:get(task_id, Task, undefined) + }), + Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), {ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange), NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), @@ -97,10 +104,18 @@ handle_cast( handle_cast( {continuation_task, TaskHeader, Task, OtelCtx}, #prg_worker_state{ + ns_id = NsId, ns_opts = #{process_step_timeout := TimeoutSec} } = State ) -> ?with_span(OtelCtx, <<"process continuation">>, fun() -> + ProcessId = maps:get(process_id, Task), + ?span_attributes(#{ + <<"progressor.process.type">> => atom_to_binary(element(1, TaskHeader)), + <<"progressor.process.id">> => ProcessId, + <<"progressor.process.namespace">> => NsId, + <<"progressor.process.task_id">> => maps:get(task_id, Task, undefined) + }), Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, NewState = do_process_task(TaskHeader, Task, Deadline, State), {noreply, NewState} diff --git a/src/prg_worker_sidecar.erl b/src/prg_worker_sidecar.erl index 59e029c..f7432a8 100644 --- a/src/prg_worker_sidecar.erl +++ b/src/prg_worker_sidecar.erl @@ -213,10 +213,12 @@ handle_call( ERR; Unsupported -> logger:error("processor unexpected result: ~p", [Unsupported]), + _ = ?span_exception(error, badmatch, <<"unsupported_result">>, []), {error, <<"unsupported_result">>} catch Class:Term:Trace -> logger:error("processor exception: ~p", [[Class, Term, Trace]]), + _ = ?span_exception(Class, Term, Trace), {error, {exception, Class, Term}} end, {reply, Response, State} From 7abc0a13cab604a53c3f04757984e30fad74e5fa Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Mon, 19 Jan 2026 17:44:31 +0300 Subject: [PATCH 05/12] Fixes dev container permissions Updates composed environment w/ dev user w/ host's uid/gid for proper ownership of files in $PWD written by container --- Dockerfile.dev | 6 ++++++ Makefile | 6 ++++-- docker-compose.yml | 2 ++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/Dockerfile.dev b/Dockerfile.dev index e4cfa53..2584f8c 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -9,6 +9,12 @@ ARG TARGETARCH RUN wget -q -O- "https://github.com/valitydev/thrift/releases/download/${THRIFT_VERSION}/thrift-${THRIFT_VERSION}-linux-${TARGETARCH}.tar.gz" \ | tar -xvz -C /usr/local/bin/ +# Setup user for matching permissions in mounted volumes +ARG USER_UID +ARG USER_GID +RUN useradd -ms /bin/bash dev +USER dev + # Set env ENV CHARSET=UTF-8 ENV LANG=C.UTF-8 diff --git a/Makefile b/Makefile index cf97318..d3edc3b 100644 --- a/Makefile +++ b/Makefile @@ -13,10 +13,12 @@ DOTENV := $(shell grep -v '^\#' .env) # Development images DEV_IMAGE_TAG = $(TEST_CONTAINER_NAME)-dev DEV_IMAGE_ID = $(file < .image.dev) +USER_UID=$(shell id -u) +USER_GID=$(shell id -g) DOCKER ?= docker DOCKERCOMPOSE ?= docker compose -DOCKERCOMPOSE_W_ENV = DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f docker-compose.yml -f compose.tracing.yaml +DOCKERCOMPOSE_W_ENV = USER_UID=$(USER_UID) USER_GID=$(USER_GID) DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f docker-compose.yml -f compose.tracing.yaml REBAR ?= rebar3 TEST_CONTAINER_NAME ?= testrunner @@ -55,7 +57,7 @@ wc-%: dev-image $(DOCKER_RUN) $(DEV_IMAGE_TAG) make $* wdeps-shell: dev-image - $(DOCKERCOMPOSE_RUN) $(TEST_CONTAINER_NAME) su; \ + $(DOCKERCOMPOSE_RUN) $(TEST_CONTAINER_NAME) bash; \ $(DOCKERCOMPOSE_W_ENV) down wdeps-%: dev-image diff --git a/docker-compose.yml b/docker-compose.yml index 862be9a..b90cd2d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,8 @@ services: args: OTP_VERSION: $OTP_VERSION THRIFT_VERSION: $THRIFT_VERSION + USER_UID: $USER_UID + USER_GID: $USER_GID volumes: - .:$PWD hostname: progressor From 2117737a289609446d4767fbd5dbee57992ee13e Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Mon, 26 Jan 2026 11:11:43 +0300 Subject: [PATCH 06/12] Lower OTEL-span-verbosity of process worker --- include/otel.hrl | 15 +++- rebar.lock | 2 +- src/prg_worker.erl | 21 +++--- src/prg_worker_sidecar.erl | 62 ++++++++-------- src/progressor.erl | 143 +++++++++++++++---------------------- 5 files changed, 106 insertions(+), 137 deletions(-) diff --git a/include/otel.hrl b/include/otel.hrl index b5f4748..aff2f10 100644 --- a/include/otel.hrl +++ b/include/otel.hrl @@ -18,9 +18,18 @@ -define(tracer, opentelemetry:get_application_tracer(?MODULE)). --define(with_span(OtelCtx, SpanName, Fun), - otel_tracer:with_span(OtelCtx, ?tracer, SpanName, #{kind => internal}, fun(_SpanCtx) -> Fun() end) -). +%% NOTE See `otel_tracer_default:with_span/5` +-define(with_span(Ctx, SpanName, Fun), begin + SpanCtx = otel_tracer:start_span(Ctx, ?tracer, SpanName, #{kind => internal}), + Ctx1 = otel_tracer:set_current_span(Ctx, SpanCtx), + Token = otel_ctx:attach(Ctx1), + try + Fun() + after + _ = otel_span_ets:end_span(SpanCtx), + otel_ctx:detach(Token) + end +end). -define(with_span(SpanName, Fun), ?with_span(?current_otel_ctx, SpanName, Fun)). -endif. diff --git a/rebar.lock b/rebar.lock index 951b61d..22f9a4d 100644 --- a/rebar.lock +++ b/rebar.lock @@ -18,7 +18,7 @@ {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.1.10">>},1}, {<<"mg_proto">>, {git,"https://github.com/valitydev/machinegun-proto.git", - {ref,"3decc8f8b13c9cd1701deab47781aacddd7dbc92"}}, + {ref,"cc2c27c30d30dc34c0c56fc7c7e96326d6bd6a14"}}, 0}, {<<"opentelemetry_api">>,{pkg,<<"opentelemetry_api">>,<<"1.4.0">>},0}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0}, diff --git a/src/prg_worker.erl b/src/prg_worker.erl index 79bb172..22cc980 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -41,7 +41,7 @@ continuation_task(Worker, TaskHeader, Task) -> -spec next_task(pid()) -> ok. next_task(Worker) -> _ = ?span_event(<<"next task">>), - gen_server:cast(Worker, next_task). + gen_server:cast(Worker, next_9task). -spec process_scheduled_task(pid(), id(), task_id()) -> ok. process_scheduled_task(Worker, ProcessId, TaskId) -> @@ -64,17 +64,14 @@ init([NsId, NsOpts]) -> handle_continue(do_start, #prg_worker_state{ns_id = NsId} = State) -> %% FIXME Worker w/o OTEL context, since it is not passed to init w/ `start_child` - ?with_span(<<"do start">>, fun() -> - {ok, Pid} = prg_worker_sidecar:start_link(), - case prg_scheduler:pop_task(NsId, self()) of - {TaskHeader, Task} -> - ok = process_task(self(), TaskHeader, Task); - not_found -> - _ = ?span_event(<<"no task">>), - skip - end, - {noreply, State#prg_worker_state{sidecar_pid = Pid}} - end). + {ok, Pid} = prg_worker_sidecar:start_link(), + case prg_scheduler:pop_task(NsId, self()) of + {TaskHeader, Task} -> + ok = process_task(self(), TaskHeader, Task); + not_found -> + skip + end, + {noreply, State#prg_worker_state{sidecar_pid = Pid}}. handle_call(_Request, _From, #prg_worker_state{} = State) -> {reply, ok, State}. diff --git a/src/prg_worker_sidecar.erl b/src/prg_worker_sidecar.erl index f7432a8..1483f94 100644 --- a/src/prg_worker_sidecar.erl +++ b/src/prg_worker_sidecar.erl @@ -150,7 +150,7 @@ remove_process(Pid, _Deadline, StorageOpts, NsId, ProcessId) -> event_sink(Pid, Deadline, #{namespace := NS} = NsOpts, ProcessId, Events) -> Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> - gen_server:call(Pid, {event_sink, NsOpts, ProcessId, Events, ?current_otel_ctx}, Timeout) + gen_server:call(Pid, {event_sink, NsOpts, ProcessId, Events}, Timeout) end, prg_utils:with_observe(Fun, ?NOTIFICATION_KEY, [NS, "event_sink"]). @@ -159,7 +159,7 @@ event_sink(Pid, Deadline, #{namespace := NS} = NsOpts, ProcessId, Events) -> lifecycle_sink(Pid, Deadline, #{namespace := NS} = NsOpts, TaskType, ProcessId) -> Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> - gen_server:call(Pid, {lifecycle_sink, NsOpts, TaskType, ProcessId, ?current_otel_ctx}, Timeout) + gen_server:call(Pid, {lifecycle_sink, NsOpts, TaskType, ProcessId}, Timeout) end, prg_utils:with_observe(Fun, ?NOTIFICATION_KEY, [NS, "lifecycle_sink"]). %% @@ -168,19 +168,19 @@ lifecycle_sink(Pid, Deadline, #{namespace := NS} = NsOpts, TaskType, ProcessId) {ok, process()} | {error, _Reason}. get_process(Pid, _Deadline, StorageOpts, NsId, ProcessId) -> %% Timeout = Deadline - erlang:system_time(millisecond), - gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, #{}, ?current_otel_ctx}, infinity). + gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, #{}}, infinity). -spec get_process(pid(), timestamp_ms(), storage_opts(), namespace_id(), id(), history_range()) -> {ok, process()} | {error, _Reason}. get_process(Pid, _Deadline, StorageOpts, NsId, ProcessId, HistoryRange) -> %% Timeout = Deadline - erlang:system_time(millisecond), - gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, HistoryRange, ?current_otel_ctx}, infinity). + gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, HistoryRange}, infinity). -spec get_task(pid(), timestamp_ms(), storage_opts(), namespace_id(), task_id()) -> {ok, task()} | {error, _Reason}. get_task(Pid, _Deadline, StorageOpts, NsId, TaskId) -> %% Timeout = Deadline - erlang:system_time(millisecond), - gen_server:call(Pid, {get_task, StorageOpts, NsId, TaskId, ?current_otel_ctx}, infinity). + gen_server:call(Pid, {get_task, StorageOpts, NsId, TaskId}, infinity). %%%=================================================================== %%% Spawning and gen_server implementation @@ -203,7 +203,9 @@ handle_call( _From, #prg_sidecar_state{} = State ) -> - ?with_span(OtelCtx, <<"process">>, fun() -> + ?with_span(OtelCtx, <<"process with handler">>, fun() -> + %% NOTE Opaque callback `Handler:process/3` will have access to this + %% erlang process's dictionary and OTEL context. Response = try Handler:process(Request, Options, Ctx) of {ok, _Result} = OK -> @@ -248,29 +250,25 @@ handle_call( {reply, Response, State} end); handle_call( - {get_process, StorageOpts, NsId, ProcessId, HistoryRange, OtelCtx}, + {get_process, StorageOpts, NsId, ProcessId, HistoryRange}, _From, #prg_sidecar_state{} = State ) -> - ?with_span(OtelCtx, <<"get process">>, fun() -> - Fun = fun() -> - prg_storage:get_process(internal, StorageOpts, NsId, ProcessId, HistoryRange) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State} - end); + Fun = fun() -> + prg_storage:get_process(internal, StorageOpts, NsId, ProcessId, HistoryRange) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State}; handle_call( - {get_task, StorageOpts, NsId, TaskId, OtelCtx}, + {get_task, StorageOpts, NsId, TaskId}, _From, #prg_sidecar_state{} = State ) -> - ?with_span(OtelCtx, <<"get task">>, fun() -> - Fun = fun() -> - prg_storage:get_task(StorageOpts, NsId, TaskId) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State} - end); + Fun = fun() -> + prg_storage:get_task(StorageOpts, NsId, TaskId) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State}; handle_call( {complete_and_suspend, StorageOpts, NsId, TaskResult, Process, Events, OtelCtx}, _From, @@ -307,18 +305,14 @@ handle_call( Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State} end); -handle_call({event_sink, NsOpts, ProcessId, Events, OtelCtx}, _From, State) -> - ?with_span(OtelCtx, <<"event sink">>, fun() -> - Fun = fun() -> prg_notifier:event_sink(NsOpts, ProcessId, Events) end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State} - end); -handle_call({lifecycle_sink, NsOpts, TaskType, ProcessId, OtelCtx}, _From, State) -> - ?with_span(OtelCtx, <<"lifecycle sink">>, fun() -> - Fun = fun() -> prg_notifier:lifecycle_sink(NsOpts, TaskType, ProcessId) end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State} - end). +handle_call({event_sink, NsOpts, ProcessId, Events}, _From, State) -> + Fun = fun() -> prg_notifier:event_sink(NsOpts, ProcessId, Events) end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State}; +handle_call({lifecycle_sink, NsOpts, TaskType, ProcessId}, _From, State) -> + Fun = fun() -> prg_notifier:lifecycle_sink(NsOpts, TaskType, ProcessId) end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State}. handle_cast(_Request, #prg_sidecar_state{} = State) -> {noreply, State}. diff --git a/src/progressor.erl b/src/progressor.erl index b60db0f..286188e 100644 --- a/src/progressor.erl +++ b/src/progressor.erl @@ -47,7 +47,6 @@ reply(Pid, Msg) -> init(Req) -> prg_utils:pipe( [ - fun maybe_add_otel_ctx/1, fun add_ns_opts/1, fun check_idempotency/1, fun add_task/1, @@ -61,7 +60,6 @@ init(Req) -> call(Req) -> prg_utils:pipe( [ - fun maybe_add_otel_ctx/1, fun add_ns_opts/1, fun check_idempotency/1, fun(Data) -> check_process_status(Data, <<"running">>) end, @@ -76,7 +74,6 @@ call(Req) -> repair(Req) -> prg_utils:pipe( [ - fun maybe_add_otel_ctx/1, fun add_ns_opts/1, fun check_idempotency/1, fun(Data) -> check_process_status(Data, <<"error">>) end, @@ -91,7 +88,6 @@ repair(Req) -> simple_repair(Req) -> prg_utils:pipe( [ - fun maybe_add_otel_ctx/1, fun add_ns_opts/1, fun check_idempotency/1, fun check_process_continuation/1, @@ -105,7 +101,6 @@ simple_repair(Req) -> get(Req) -> prg_utils:pipe( [ - fun maybe_add_otel_ctx/1, fun add_ns_opts/1, fun do_get/1 ], @@ -116,7 +111,6 @@ get(Req) -> put(Req) -> prg_utils:pipe( [ - fun maybe_add_otel_ctx/1, fun add_ns_opts/1, fun do_put/1 ], @@ -127,7 +121,6 @@ put(Req) -> trace(Req) -> prg_utils:pipe( [ - fun maybe_add_otel_ctx/1, fun add_ns_opts/1, fun do_trace/1 ], @@ -174,11 +167,6 @@ cleanup_storage(#{ns := NsId, ns_opts := #{storage := StorageOpts}}) -> %% Internal functions -maybe_add_otel_ctx(#{otel_ctx := _} = Opts) -> - Opts; -maybe_add_otel_ctx(Opts) -> - Opts#{otel_ctx => ?current_otel_ctx}. - add_ns_opts(#{ns := NsId} = Opts) -> NSs = application:get_env(progressor, namespaces, #{}), case maps:get(NsId, NSs, undefined) of @@ -240,12 +228,10 @@ check_process_continuation( do_simple_repair(#{task := _}) -> %% process will repaired via timeout task {ok, ok}; -do_simple_repair(#{ns_opts := #{storage := StorageOpts} = NsOpts, id := Id, ns := NsId, otel_ctx := OtelCtx}) -> - ?with_span(OtelCtx, <<"simple repair">>, fun() -> - ok = prg_storage:repair_process(StorageOpts, NsId, Id), - ok = prg_notifier:lifecycle_sink(NsOpts, repair, Id), - {ok, ok} - end). +do_simple_repair(#{ns_opts := #{storage := StorageOpts} = NsOpts, id := Id, ns := NsId}) -> + ok = prg_storage:repair_process(StorageOpts, NsId, Id), + ok = prg_notifier:lifecycle_sink(NsOpts, repair, Id), + {ok, ok}. prepare( Fun, @@ -326,60 +312,45 @@ await_task_result(StorageOpts, NsId, KeyOrId, StepTimeout, Duration) -> ) end. -do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, range := HistoryRange, otel_ctx := OtelCtx} = Req) -> - ?with_span(OtelCtx, <<"get">>, fun() -> - case prg_storage:get_process(external, StorageOpts, NsId, Id, HistoryRange) of - {ok, #{status := <<"init">>}} -> - %% init task not finished, await and retry - Timeout = application:get_env(progressor, task_repeat_request_timeout, ?TASK_REPEAT_REQUEST_TIMEOUT), - timer:sleep(Timeout), - do_get(Req); - Result -> - Result - end - end); +do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, range := HistoryRange} = Req) -> + case prg_storage:get_process(external, StorageOpts, NsId, Id, HistoryRange) of + {ok, #{status := <<"init">>}} -> + %% init task not finished, await and retry + Timeout = application:get_env(progressor, task_repeat_request_timeout, ?TASK_REPEAT_REQUEST_TIMEOUT), + timer:sleep(Timeout), + do_get(Req); + Result -> + Result + end; do_get(Req) -> do_get(Req#{range => #{}}). -do_trace(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, otel_ctx := OtelCtx}) -> - ?with_span(OtelCtx, <<"trace">>, fun() -> - prg_storage:process_trace(StorageOpts, NsId, Id) - end). - -do_put( +do_trace(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId}) -> + prg_storage:process_trace(StorageOpts, NsId, Id). +do_put(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, args := #{process := Process} = Args} = Opts) -> #{ - ns_opts := #{storage := StorageOpts}, - id := Id, - ns := NsId, - args := #{process := Process} = Args, - otel_ctx := OtelCtx - } = Opts -) -> - ?with_span(OtelCtx, <<"put">>, fun() -> - #{ - process_id := ProcessId - } = Process, - Action = maps:get(action, Args, undefined), - Context = maps:get(context, Opts, <<>>), - Now = erlang:system_time(second), - InitTask = #{ - process_id => ProcessId, - task_type => <<"init">>, - status => <<"finished">>, - args => <<>>, - context => Context, - response => term_to_binary({ok, ok}), - scheduled_time => Now, - running_time => Now, - finished_time => Now, - last_retry_interval => 0, - attempts_count => 0 - }, - ActiveTask = action_to_task(Action, ProcessId, Context), - ProcessData0 = #{process => Process, init_task => InitTask}, - ProcessData = maybe_add_key(ActiveTask, active_task, ProcessData0), - prg_storage:put_process_data(StorageOpts, NsId, Id, ProcessData) - end). + process_id := ProcessId + } = Process, + Action = maps:get(action, Args, undefined), + Context = maps:get(context, Opts, <<>>), + Now = erlang:system_time(second), + InitTask = #{ + process_id => ProcessId, + task_type => <<"init">>, + status => <<"finished">>, + args => <<>>, + context => Context, + response => term_to_binary({ok, ok}), + scheduled_time => Now, + running_time => Now, + finished_time => Now, + last_retry_interval => 0, + attempts_count => 0 + }, + ActiveTask = action_to_task(Action, ProcessId, Context), + ProcessData0 = #{process => Process, init_task => InitTask}, + ProcessData = maybe_add_key(ActiveTask, active_task, ProcessData0), + prg_storage:put_process_data(StorageOpts, NsId, Id, ProcessData). do_health_check(#{ns := NsId, ns_opts := #{storage := StorageOpts}}) -> try prg_storage:health_check(StorageOpts) of @@ -394,25 +365,23 @@ do_health_check(#{ns := NsId, ns_opts := #{storage := StorageOpts}}) -> {critical, #{progressor_namespace => NsId, error => Detail}} end. -process_call(#{ns_opts := NsOpts, ns := NsId, type := Type, task := Task, worker := Worker, otel_ctx := OtelCtx}) -> - ?with_span(OtelCtx, <<"call">>, fun() -> - TimeoutSec = maps:get(process_step_timeout, NsOpts, ?DEFAULT_STEP_TIMEOUT_SEC), - Timeout = TimeoutSec * 1000, - Ref = make_ref(), - TaskHeader = make_task_header(Type, Ref), - ok = prg_worker:process_task(Worker, TaskHeader, Task), - ok = prg_scheduler:release_worker(NsId, self(), Worker), - %% TODO Maybe refactor to span inside scheduler gen_server - _ = ?span_event(<<"release worker">>), - %% see fun reply/2 - receive - {Ref, Result} -> - Result - after Timeout -> - _ = ?span_exception(throw, <<"timeout">>, []), - {error, <<"timeout">>} - end - end). +process_call(#{ns_opts := NsOpts, ns := NsId, type := Type, task := Task, worker := Worker}) -> + TimeoutSec = maps:get(process_step_timeout, NsOpts, ?DEFAULT_STEP_TIMEOUT_SEC), + Timeout = TimeoutSec * 1000, + Ref = make_ref(), + TaskHeader = make_task_header(Type, Ref), + ok = prg_worker:process_task(Worker, TaskHeader, Task), + ok = prg_scheduler:release_worker(NsId, self(), Worker), + %% TODO Maybe refactor to span inside scheduler gen_server + _ = ?span_event(<<"release worker">>), + %% see fun reply/2 + receive + {Ref, Result} -> + Result + after Timeout -> + _ = ?span_exception(throw, <<"timeout">>, []), + {error, <<"timeout">>} + end. make_task_header(init, Ref) -> {init, {self(), Ref}}; From 52c771adb44df02a2f2d662c80b1ff19d4b22b84 Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Mon, 26 Jan 2026 11:23:28 +0300 Subject: [PATCH 07/12] Fixes dialyzer and linter issues --- include/otel.hrl | 15 +++------------ src/prg_worker.erl | 2 +- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/include/otel.hrl b/include/otel.hrl index aff2f10..b5f4748 100644 --- a/include/otel.hrl +++ b/include/otel.hrl @@ -18,18 +18,9 @@ -define(tracer, opentelemetry:get_application_tracer(?MODULE)). -%% NOTE See `otel_tracer_default:with_span/5` --define(with_span(Ctx, SpanName, Fun), begin - SpanCtx = otel_tracer:start_span(Ctx, ?tracer, SpanName, #{kind => internal}), - Ctx1 = otel_tracer:set_current_span(Ctx, SpanCtx), - Token = otel_ctx:attach(Ctx1), - try - Fun() - after - _ = otel_span_ets:end_span(SpanCtx), - otel_ctx:detach(Token) - end -end). +-define(with_span(OtelCtx, SpanName, Fun), + otel_tracer:with_span(OtelCtx, ?tracer, SpanName, #{kind => internal}, fun(_SpanCtx) -> Fun() end) +). -define(with_span(SpanName, Fun), ?with_span(?current_otel_ctx, SpanName, Fun)). -endif. diff --git a/src/prg_worker.erl b/src/prg_worker.erl index 22cc980..c6f7b20 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -41,7 +41,7 @@ continuation_task(Worker, TaskHeader, Task) -> -spec next_task(pid()) -> ok. next_task(Worker) -> _ = ?span_event(<<"next task">>), - gen_server:cast(Worker, next_9task). + gen_server:cast(Worker, next_task). -spec process_scheduled_task(pid(), id(), task_id()) -> ok. process_scheduled_task(Worker, ProcessId, TaskId) -> From fe74755a9e8658fab2f91a5690e4890574127744 Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Mon, 26 Jan 2026 11:27:51 +0300 Subject: [PATCH 08/12] Adds cache distinction for deps in CI runner --- .github/workflows/erlang-checks.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/erlang-checks.yaml b/.github/workflows/erlang-checks.yaml index 7d1512b..730cda2 100644 --- a/.github/workflows/erlang-checks.yaml +++ b/.github/workflows/erlang-checks.yaml @@ -37,5 +37,6 @@ jobs: use-thrift: true thrift-version: ${{ needs.setup.outputs.thrift-version }} run-ct-with-compose: true + cache-version: v2 use-coveralls: true upload-coverage: false From 1cf479d3baf5b9d4264aced911c6d87364eb2c00 Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Mon, 26 Jan 2026 12:56:03 +0300 Subject: [PATCH 09/12] Bumps CI action --- .github/workflows/erlang-checks.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/erlang-checks.yaml b/.github/workflows/erlang-checks.yaml index 730cda2..82ea768 100644 --- a/.github/workflows/erlang-checks.yaml +++ b/.github/workflows/erlang-checks.yaml @@ -30,13 +30,13 @@ jobs: run: name: Run checks needs: setup - uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@v1.0.17 + uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@v1 with: otp-version: ${{ needs.setup.outputs.otp-version }} rebar-version: ${{ needs.setup.outputs.rebar-version }} use-thrift: true thrift-version: ${{ needs.setup.outputs.thrift-version }} run-ct-with-compose: true - cache-version: v2 + cache-version: v1 use-coveralls: true upload-coverage: false From 3115bcd759076fbfe500c38c38f62384f25995e4 Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Mon, 26 Jan 2026 13:13:00 +0300 Subject: [PATCH 10/12] Adds pinned tmp workflow action w/ uid and gid set up --- .github/workflows/erlang-checks.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/erlang-checks.yaml b/.github/workflows/erlang-checks.yaml index 82ea768..4444e3b 100644 --- a/.github/workflows/erlang-checks.yaml +++ b/.github/workflows/erlang-checks.yaml @@ -30,7 +30,7 @@ jobs: run: name: Run checks needs: setup - uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@v1 + uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@89a39d0659c4602caba6c87d75e3878bdf15a67f with: otp-version: ${{ needs.setup.outputs.otp-version }} rebar-version: ${{ needs.setup.outputs.rebar-version }} From 8456b974386560a2477280e8fbc18f83ed28197c Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Mon, 26 Jan 2026 13:22:58 +0300 Subject: [PATCH 11/12] Bumps CI action hash --- .github/workflows/erlang-checks.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/erlang-checks.yaml b/.github/workflows/erlang-checks.yaml index 4444e3b..49edc6b 100644 --- a/.github/workflows/erlang-checks.yaml +++ b/.github/workflows/erlang-checks.yaml @@ -30,7 +30,7 @@ jobs: run: name: Run checks needs: setup - uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@89a39d0659c4602caba6c87d75e3878bdf15a67f + uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@527977799bb5b747c0110d4b47c9b6154f5ba988 with: otp-version: ${{ needs.setup.outputs.otp-version }} rebar-version: ${{ needs.setup.outputs.rebar-version }} From cd1c8a64bd5a95264a74626d8262a2898c9aed8b Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Mon, 26 Jan 2026 16:54:28 +0300 Subject: [PATCH 12/12] Revert "Fixes dev container permissions" This reverts commit 7abc0a13cab604a53c3f04757984e30fad74e5fa. For some reason there're CI problems within github runner cache and `_build` dir's permissions on dependency upgrade, e.g. `epg_connector`. --- .github/workflows/erlang-checks.yaml | 3 +-- Dockerfile.dev | 6 ------ Makefile | 6 ++---- docker-compose.yml | 2 -- 4 files changed, 3 insertions(+), 14 deletions(-) diff --git a/.github/workflows/erlang-checks.yaml b/.github/workflows/erlang-checks.yaml index 49edc6b..7d1512b 100644 --- a/.github/workflows/erlang-checks.yaml +++ b/.github/workflows/erlang-checks.yaml @@ -30,13 +30,12 @@ jobs: run: name: Run checks needs: setup - uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@527977799bb5b747c0110d4b47c9b6154f5ba988 + uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@v1.0.17 with: otp-version: ${{ needs.setup.outputs.otp-version }} rebar-version: ${{ needs.setup.outputs.rebar-version }} use-thrift: true thrift-version: ${{ needs.setup.outputs.thrift-version }} run-ct-with-compose: true - cache-version: v1 use-coveralls: true upload-coverage: false diff --git a/Dockerfile.dev b/Dockerfile.dev index 2584f8c..e4cfa53 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -9,12 +9,6 @@ ARG TARGETARCH RUN wget -q -O- "https://github.com/valitydev/thrift/releases/download/${THRIFT_VERSION}/thrift-${THRIFT_VERSION}-linux-${TARGETARCH}.tar.gz" \ | tar -xvz -C /usr/local/bin/ -# Setup user for matching permissions in mounted volumes -ARG USER_UID -ARG USER_GID -RUN useradd -ms /bin/bash dev -USER dev - # Set env ENV CHARSET=UTF-8 ENV LANG=C.UTF-8 diff --git a/Makefile b/Makefile index d3edc3b..cf97318 100644 --- a/Makefile +++ b/Makefile @@ -13,12 +13,10 @@ DOTENV := $(shell grep -v '^\#' .env) # Development images DEV_IMAGE_TAG = $(TEST_CONTAINER_NAME)-dev DEV_IMAGE_ID = $(file < .image.dev) -USER_UID=$(shell id -u) -USER_GID=$(shell id -g) DOCKER ?= docker DOCKERCOMPOSE ?= docker compose -DOCKERCOMPOSE_W_ENV = USER_UID=$(USER_UID) USER_GID=$(USER_GID) DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f docker-compose.yml -f compose.tracing.yaml +DOCKERCOMPOSE_W_ENV = DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f docker-compose.yml -f compose.tracing.yaml REBAR ?= rebar3 TEST_CONTAINER_NAME ?= testrunner @@ -57,7 +55,7 @@ wc-%: dev-image $(DOCKER_RUN) $(DEV_IMAGE_TAG) make $* wdeps-shell: dev-image - $(DOCKERCOMPOSE_RUN) $(TEST_CONTAINER_NAME) bash; \ + $(DOCKERCOMPOSE_RUN) $(TEST_CONTAINER_NAME) su; \ $(DOCKERCOMPOSE_W_ENV) down wdeps-%: dev-image diff --git a/docker-compose.yml b/docker-compose.yml index b90cd2d..862be9a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,8 +9,6 @@ services: args: OTP_VERSION: $OTP_VERSION THRIFT_VERSION: $THRIFT_VERSION - USER_UID: $USER_UID - USER_GID: $USER_GID volumes: - .:$PWD hostname: progressor