diff --git a/Makefile b/Makefile index 7345cae..cf97318 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,7 @@ DEV_IMAGE_ID = $(file < .image.dev) DOCKER ?= docker DOCKERCOMPOSE ?= docker compose -DOCKERCOMPOSE_W_ENV = DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f docker-compose.yml +DOCKERCOMPOSE_W_ENV = DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f docker-compose.yml -f compose.tracing.yaml REBAR ?= rebar3 TEST_CONTAINER_NAME ?= testrunner diff --git a/compose.tracing.yaml b/compose.tracing.yaml new file mode 100644 index 0000000..28e99aa --- /dev/null +++ b/compose.tracing.yaml @@ -0,0 +1,33 @@ +services: + + postgres: + environment: &otlp_enabled + OTEL_TRACES_EXPORTER: otlp + OTEL_TRACES_SAMPLER: parentbased_always_off + OTEL_EXPORTER_OTLP_PROTOCOL: http_protobuf + OTEL_EXPORTER_OTLP_ENDPOINT: http://jaeger:4318 + + testrunner: + environment: + <<: *otlp_enabled + OTEL_SERVICE_NAME: progressor_testrunner + OTEL_TRACES_SAMPLER: parentbased_always_on + depends_on: + jaeger: + condition: service_healthy + + jaeger: + image: jaegertracing/all-in-one:1.47 + environment: + - COLLECTOR_OTLP_ENABLED=true + healthcheck: + test: "/go/bin/all-in-one-linux status" + interval: 2s + timeout: 1s + retries: 20 + ports: + - 4317:4317 # OTLP gRPC receiver + - 4318:4318 # OTLP http receiver + - 5778:5778 + - 14250:14250 + - 16686:16686 diff --git a/include/otel.hrl b/include/otel.hrl new file mode 100644 index 0000000..b5f4748 --- /dev/null +++ b/include/otel.hrl @@ -0,0 +1,26 @@ +-ifndef(__progressor_otel__). +-define(__progressor_otel__, ok). + +-define(current_otel_ctx, otel_ctx:get_current()). + +-define(current_span_ctx, otel_tracer:current_span_ctx(?current_otel_ctx)). + +-define(span_exception(Class, Error, Stacktrace), + otel_span:record_exception(?current_span_ctx, Class, Error, Stacktrace, #{}) +). +-define(span_exception(Class, Error, Message, Stacktrace), + otel_span:record_exception(?current_span_ctx, Class, Error, Message, Stacktrace, #{}) +). + +-define(span_event(EventName), otel_span:add_event(?current_span_ctx, EventName, #{})). + +-define(span_attributes(Attributes), otel_span:set_attributes(?current_span_ctx, Attributes)). + +-define(tracer, opentelemetry:get_application_tracer(?MODULE)). + +-define(with_span(OtelCtx, SpanName, Fun), + otel_tracer:with_span(OtelCtx, ?tracer, SpanName, #{kind => internal}, fun(_SpanCtx) -> Fun() end) +). +-define(with_span(SpanName, Fun), ?with_span(?current_otel_ctx, SpanName, Fun)). + +-endif. diff --git a/rebar.config b/rebar.config index 1261a06..ed4bee4 100644 --- a/rebar.config +++ b/rebar.config @@ -5,7 +5,8 @@ {recon, "2.5.6"}, {thrift, {git, "https://github.com/valitydev/thrift_erlang.git", {tag, "v1.0.0"}}}, {mg_proto, {git, "https://github.com/valitydev/machinegun-proto.git", {branch, "master"}}}, - {epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.5"}}} + {epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.5"}}}, + {opentelemetry_api, "1.4.0"} ]}. {xref_checks, [ @@ -38,7 +39,9 @@ {profiles, [ {test, [ {deps, [ - {meck, "0.9.2"} + {meck, "0.9.2"}, + {opentelemetry, "1.5.0"}, + {opentelemetry_exporter, "1.8.0"} ]}, {dialyzer, [{plt_extra_apps, [eunit, common_test, runtime_tools, meck]}]} ]} diff --git a/rebar.lock b/rebar.lock index 28c39ff..22f9a4d 100644 --- a/rebar.lock +++ b/rebar.lock @@ -18,8 +18,9 @@ {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.1.10">>},1}, {<<"mg_proto">>, {git,"https://github.com/valitydev/machinegun-proto.git", - {ref,"3decc8f8b13c9cd1701deab47781aacddd7dbc92"}}, + {ref,"cc2c27c30d30dc34c0c56fc7c7e96326d6bd6a14"}}, 0}, + {<<"opentelemetry_api">>,{pkg,<<"opentelemetry_api">>,<<"1.4.0">>},0}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0}, {<<"quantile_estimator">>,{pkg,<<"quantile_estimator">>,<<"0.2.1">>},1}, {<<"recon">>,{pkg,<<"recon">>,<<"2.5.6">>},0}, @@ -34,6 +35,7 @@ {<<"jsone">>, <<"347FF1FA700E182E1F9C5012FA6D737B12C854313B9AE6954CA75D3987D6C06D">>}, {<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>}, {<<"kafka_protocol">>, <<"F917B6C90C8DF0DE2B40A87D6B9AE1CFCE7788E91A65818E90E40CF76111097A">>}, + {<<"opentelemetry_api">>, <<"63CA1742F92F00059298F478048DFB826F4B20D49534493D6919A0DB39B6DB04">>}, {<<"prometheus">>, <<"B95F8DE8530F541BD95951E18E355A840003672E5EDA4788C5FA6183406BA29A">>}, {<<"quantile_estimator">>, <<"EF50A361F11B5F26B5F16D0696E46A9E4661756492C981F7B2229EF42FF1CD15">>}, {<<"recon">>, <<"9052588E83BFEDFD9B72E1034532AEE2A5369D9D9343B61AEB7FBCE761010741">>}]}, @@ -43,6 +45,7 @@ {<<"jsone">>, <<"08560B78624A12E0B5E7EC0271EC8CA38EF51F63D84D84843473E14D9B12618C">>}, {<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>}, {<<"kafka_protocol">>, <<"DF680A3706EAD8695F8B306897C0A33E8063C690DA9308DB87B462CFD7029D04">>}, + {<<"opentelemetry_api">>, <<"3DFBBFAA2C2ED3121C5C483162836C4F9027DEF469C41578AF5EF32589FCFC58">>}, {<<"prometheus">>, <<"719862351AABF4DF7079B05DC085D2BBCBE3AC0AC3009E956671B1D5AB88247D">>}, {<<"quantile_estimator">>, <<"282A8A323CA2A845C9E6F787D166348F776C1D4A41EDE63046D72D422E3DA946">>}, {<<"recon">>, <<"96C6799792D735CC0F0FD0F86267E9D351E63339CBE03DF9D162010CEFC26BB0">>}]} diff --git a/src/prg_worker.erl b/src/prg_worker.erl index d3bef1a..c6f7b20 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -3,6 +3,7 @@ -behaviour(gen_server). -include("progressor.hrl"). +-include("otel.hrl"). -export([start_link/2]). -export([ @@ -31,19 +32,20 @@ -spec process_task(pid(), task_header(), task()) -> ok. process_task(Worker, TaskHeader, #{process_id := _ProcessId, task_id := _TaskId} = Task) -> - gen_server:cast(Worker, {process_task, TaskHeader, Task}). + gen_server:cast(Worker, {process_task, TaskHeader, Task, ?current_otel_ctx}). -spec continuation_task(pid(), task_header(), task()) -> ok. continuation_task(Worker, TaskHeader, Task) -> - gen_server:cast(Worker, {continuation_task, TaskHeader, Task}). + gen_server:cast(Worker, {continuation_task, TaskHeader, Task, ?current_otel_ctx}). -spec next_task(pid()) -> ok. next_task(Worker) -> + _ = ?span_event(<<"next task">>), gen_server:cast(Worker, next_task). -spec process_scheduled_task(pid(), id(), task_id()) -> ok. process_scheduled_task(Worker, ProcessId, TaskId) -> - gen_server:cast(Worker, {process_scheduled_task, ProcessId, TaskId}). + gen_server:cast(Worker, {process_scheduled_task, ProcessId, TaskId, ?current_otel_ctx}). %%%=================================================================== %%% Spawning and gen_server implementation @@ -61,6 +63,7 @@ init([NsId, NsOpts]) -> {continue, do_start}}. handle_continue(do_start, #prg_worker_state{ns_id = NsId} = State) -> + %% FIXME Worker w/o OTEL context, since it is not passed to init w/ `start_child` {ok, Pid} = prg_worker_sidecar:start_link(), case prg_scheduler:pop_task(NsId, self()) of {TaskHeader, Task} -> @@ -74,54 +77,76 @@ handle_call(_Request, _From, #prg_worker_state{} = State) -> {reply, ok, State}. handle_cast( - {process_task, TaskHeader, Task}, + {process_task, TaskHeader, Task, OtelCtx}, #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts, process_step_timeout := TimeoutSec} = _NsOpts, sidecar_pid = Pid } = State ) -> - Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, - ProcessId = maps:get(process_id, Task), - HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), - {ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange), - NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), - {noreply, NewState}; + ?with_span(OtelCtx, <<"process task">>, fun() -> + ProcessId = maps:get(process_id, Task), + ?span_attributes(#{ + <<"progressor.process.type">> => atom_to_binary(element(1, TaskHeader)), + <<"progressor.process.id">> => ProcessId, + <<"progressor.process.namespace">> => NsId, + <<"progressor.process.task_id">> => maps:get(task_id, Task, undefined) + }), + Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, + HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), + {ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange), + NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), + {noreply, NewState} + end); handle_cast( - {continuation_task, TaskHeader, Task}, + {continuation_task, TaskHeader, Task, OtelCtx}, #prg_worker_state{ + ns_id = NsId, ns_opts = #{process_step_timeout := TimeoutSec} } = State ) -> - Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, - NewState = do_process_task(TaskHeader, Task, Deadline, State), - {noreply, NewState}; + ?with_span(OtelCtx, <<"process continuation">>, fun() -> + ProcessId = maps:get(process_id, Task), + ?span_attributes(#{ + <<"progressor.process.type">> => atom_to_binary(element(1, TaskHeader)), + <<"progressor.process.id">> => ProcessId, + <<"progressor.process.namespace">> => NsId, + <<"progressor.process.task_id">> => maps:get(task_id, Task, undefined) + }), + Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, + NewState = do_process_task(TaskHeader, Task, Deadline, State), + {noreply, NewState} + end); handle_cast( - {process_scheduled_task, ProcessId, TaskId}, + {process_scheduled_task, ProcessId, TaskId, OtelCtx}, #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts, process_step_timeout := TimeoutSec} = _NsOpts, sidecar_pid = Pid } = State ) -> - try prg_storage:capture_task(StorageOpts, NsId, TaskId) of - [] -> - %% task cancelled, blocked, already running or finished - ok = next_task(self()), - {noreply, State}; - [#{status := <<"running">>} = Task] -> - Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, - HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), - {ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange), - TaskHeader = create_header(Task), - NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), - {noreply, NewState} - catch - Class:Term:Stacktrace -> - logger:error("process ~p. task capturing exception: ~p", [ProcessId, [Class, Term, Stacktrace]]), - ok = next_task(self()), - {noreply, State} - end; + ?with_span(OtelCtx, <<"process scheduled task">>, fun() -> + try prg_storage:capture_task(StorageOpts, NsId, TaskId) of + [] -> + %% task cancelled, blocked, already running or finished + ok = next_task(self()), + {noreply, State}; + [#{status := <<"running">>} = Task] -> + Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, + HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), + {ok, Process} = prg_worker_sidecar:get_process( + Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange + ), + TaskHeader = create_header(Task), + NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), + {noreply, NewState} + catch + Class:Term:Stacktrace -> + logger:error("process ~p. task capturing exception: ~p", [ProcessId, [Class, Term, Stacktrace]]), + ok = next_task(self()), + {noreply, State} + end + end); handle_cast(next_task, #prg_worker_state{sidecar_pid = CurrentPid}) -> %% kill sidecar and restart to clear memory true = erlang:unlink(CurrentPid), @@ -153,10 +178,12 @@ do_process_task( sidecar_pid = Pid } = State ) -> - ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, remove, ProcessId), - ok = prg_worker_sidecar:remove_process(Pid, Deadline, StorageOpts, NsId, ProcessId), - ok = next_task(self()), - State#prg_worker_state{process = undefined}; + ?with_span(<<"remove process">>, fun() -> + ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, remove, ProcessId), + ok = prg_worker_sidecar:remove_process(Pid, Deadline, StorageOpts, NsId, ProcessId), + ok = next_task(self()), + State#prg_worker_state{process = undefined} + end); do_process_task( TaskHeader, Task, @@ -378,6 +405,7 @@ success_and_unlock( last_retry_interval => 0, attempts_count => 0 }, + %% FIXME Otel must drop trace here - right before moving to other tasks {ok, [ContinuationTask | _]} = prg_worker_sidecar:complete_and_continue( Pid, Deadline, diff --git a/src/prg_worker_sidecar.erl b/src/prg_worker_sidecar.erl index a67ba4d..1483f94 100644 --- a/src/prg_worker_sidecar.erl +++ b/src/prg_worker_sidecar.erl @@ -3,6 +3,7 @@ -behaviour(gen_server). -include("progressor.hrl"). +-include("otel.hrl"). -export([start_link/0]). -export([ @@ -51,7 +52,7 @@ process(Pid, Deadline, #{namespace := NS} = NsOpts, {TaskType, _, _} = Request, Context) -> Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> - gen_server:call(Pid, {process, NsOpts, Request, Context}, Timeout) + gen_server:call(Pid, {process, NsOpts, Request, Context, ?current_otel_ctx}, Timeout) end, prg_utils:with_observe(Fun, ?PROCESSING_KEY, [NS, erlang:atom_to_list(TaskType)]). @@ -71,7 +72,7 @@ complete_and_continue(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpda Fun = fun() -> gen_server:call( Pid, - {complete_and_continue, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, Task}, + {complete_and_continue, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, Task, ?current_otel_ctx}, infinity ) end, @@ -93,7 +94,7 @@ complete_and_suspend(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpdat Fun = fun() -> gen_server:call( Pid, - {complete_and_suspend, StorageOpts, NsId, TaskResult, ProcessUpdates, Events}, + {complete_and_suspend, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, ?current_otel_ctx}, infinity ) end, @@ -113,7 +114,7 @@ complete_and_unlock(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpdate Fun = fun() -> gen_server:call( Pid, - {complete_and_unlock, StorageOpts, NsId, TaskResult, ProcessUpdates, Events}, + {complete_and_unlock, StorageOpts, NsId, TaskResult, ProcessUpdates, Events, ?current_otel_ctx}, infinity ) end, @@ -128,7 +129,7 @@ complete_and_error(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpdates Fun = fun() -> gen_server:call( Pid, - {complete_and_error, StorageOpts, NsId, TaskResult, ProcessUpdates}, + {complete_and_error, StorageOpts, NsId, TaskResult, ProcessUpdates, ?current_otel_ctx}, infinity ) end, @@ -139,7 +140,7 @@ complete_and_error(Pid, _Deadline, StorageOpts, NsId, TaskResult, ProcessUpdates remove_process(Pid, _Deadline, StorageOpts, NsId, ProcessId) -> %% Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> - gen_server:call(Pid, {remove_process, StorageOpts, NsId, ProcessId}, infinity) + gen_server:call(Pid, {remove_process, StorageOpts, NsId, ProcessId, ?current_otel_ctx}, infinity) end, prg_utils:with_observe(Fun, ?REMOVING_KEY, [erlang:atom_to_list(NsId)]). @@ -196,47 +197,58 @@ handle_call( process, #{processor := #{client := Handler, options := Options}, namespace := _NsName} = _NsOpts, Request, - Ctx + Ctx, + OtelCtx }, _From, #prg_sidecar_state{} = State ) -> - Response = - try Handler:process(Request, Options, Ctx) of - {ok, _Result} = OK -> - OK; - {error, Reason} = ERR -> - logger:error("processor error: ~p", [Reason]), - ERR; - Unsupported -> - logger:error("processor unexpected result: ~p", [Unsupported]), - {error, <<"unsupported_result">>} - catch - Class:Term:Trace -> - logger:error("processor exception: ~p", [[Class, Term, Trace]]), - {error, {exception, Class, Term}} - end, - {reply, Response, State}; + ?with_span(OtelCtx, <<"process with handler">>, fun() -> + %% NOTE Opaque callback `Handler:process/3` will have access to this + %% erlang process's dictionary and OTEL context. + Response = + try Handler:process(Request, Options, Ctx) of + {ok, _Result} = OK -> + OK; + {error, Reason} = ERR -> + logger:error("processor error: ~p", [Reason]), + ERR; + Unsupported -> + logger:error("processor unexpected result: ~p", [Unsupported]), + _ = ?span_exception(error, badmatch, <<"unsupported_result">>, []), + {error, <<"unsupported_result">>} + catch + Class:Term:Trace -> + logger:error("processor exception: ~p", [[Class, Term, Trace]]), + _ = ?span_exception(Class, Term, Trace), + {error, {exception, Class, Term}} + end, + {reply, Response, State} + end); handle_call( - {complete_and_continue, StorageOpts, NsId, TaskResult, Process, Events, Task}, + {complete_and_continue, StorageOpts, NsId, TaskResult, Process, Events, Task, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - Fun = fun() -> - prg_storage:complete_and_continue(StorageOpts, NsId, TaskResult, Process, Events, Task) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State}; + ?with_span(OtelCtx, <<"complete and continue">>, fun() -> + Fun = fun() -> + prg_storage:complete_and_continue(StorageOpts, NsId, TaskResult, Process, Events, Task) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State} + end); handle_call( - {remove_process, StorageOpts, NsId, ProcessId}, + {remove_process, StorageOpts, NsId, ProcessId, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - Fun = fun() -> - prg_storage:remove_process(StorageOpts, NsId, ProcessId) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State}; + ?with_span(OtelCtx, <<"remove_process">>, fun() -> + Fun = fun() -> + prg_storage:remove_process(StorageOpts, NsId, ProcessId) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State} + end); handle_call( {get_process, StorageOpts, NsId, ProcessId, HistoryRange}, _From, @@ -258,35 +270,41 @@ handle_call( Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}; handle_call( - {complete_and_suspend, StorageOpts, NsId, TaskResult, Process, Events}, + {complete_and_suspend, StorageOpts, NsId, TaskResult, Process, Events, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - Fun = fun() -> - prg_storage:complete_and_suspend(StorageOpts, NsId, TaskResult, Process, Events) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State}; + ?with_span(OtelCtx, <<"complete and suspend">>, fun() -> + Fun = fun() -> + prg_storage:complete_and_suspend(StorageOpts, NsId, TaskResult, Process, Events) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State} + end); handle_call( - {complete_and_unlock, StorageOpts, NsId, TaskResult, Process, Events}, + {complete_and_unlock, StorageOpts, NsId, TaskResult, Process, Events, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - Fun = fun() -> - prg_storage:complete_and_unlock(StorageOpts, NsId, TaskResult, Process, Events) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State}; + ?with_span(OtelCtx, <<"complete and unlock">>, fun() -> + Fun = fun() -> + prg_storage:complete_and_unlock(StorageOpts, NsId, TaskResult, Process, Events) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State} + end); handle_call( - {complete_and_error, StorageOpts, NsId, TaskResult, Process}, + {complete_and_error, StorageOpts, NsId, TaskResult, Process, OtelCtx}, _From, #prg_sidecar_state{} = State ) -> - Fun = fun() -> - prg_storage:complete_and_error(StorageOpts, NsId, TaskResult, Process) - end, - Response = do_with_retry(Fun, ?DEFAULT_DELAY), - {reply, Response, State}; + ?with_span(OtelCtx, <<"complete and error">>, fun() -> + Fun = fun() -> + prg_storage:complete_and_error(StorageOpts, NsId, TaskResult, Process) + end, + Response = do_with_retry(Fun, ?DEFAULT_DELAY), + {reply, Response, State} + end); handle_call({event_sink, NsOpts, ProcessId, Events}, _From, State) -> Fun = fun() -> prg_notifier:event_sink(NsOpts, ProcessId, Events) end, Response = do_with_retry(Fun, ?DEFAULT_DELAY), @@ -313,6 +331,7 @@ code_change(_OldVsn, #prg_sidecar_state{} = State, _Extra) -> %%%=================================================================== do_with_retry(Fun, Delay) -> + _ = ?span_event(<<"try">>), try Fun() of ok = Result -> Result; @@ -320,11 +339,13 @@ do_with_retry(Fun, Delay) -> Result; Error -> _ = logger:error("result processing error: ~p", [Error]), + _ = ?span_event(<<"retryable error">>), timer:sleep(Delay), do_with_retry(Fun, Delay) catch Class:Error:Trace -> _ = logger:error("result processing exception: ~p", [[Class, Error, Trace]]), + _ = ?span_exception(Class, Error, Trace), timer:sleep(Delay), do_with_retry(Fun, Delay) end. diff --git a/src/progressor.app.src b/src/progressor.app.src index d713868..be762b3 100644 --- a/src/progressor.app.src +++ b/src/progressor.app.src @@ -11,7 +11,8 @@ epg_connector, thrift, mg_proto, - brod + brod, + opentelemetry_api ]}, {env, []}, {modules, []}, diff --git a/src/progressor.erl b/src/progressor.erl index 1428582..286188e 100644 --- a/src/progressor.erl +++ b/src/progressor.erl @@ -1,6 +1,7 @@ -module(progressor). -include("progressor.hrl"). +-include("otel.hrl"). -define(TASK_REPEAT_REQUEST_TIMEOUT, 1000). -define(PREPARING_KEY, progressor_request_preparing_duration_ms). @@ -29,6 +30,7 @@ id := id(), args => term(), idempotency_key => binary(), + otel_ctx => otel_ctx:t(), context => binary(), range => history_range(), options => map(), @@ -115,7 +117,7 @@ put(Req) -> Req ). --spec trace(request()) -> {ok, _Result} | {error, _Reason}. +-spec trace(request()) -> {ok, _Result :: term()} | {error, _Reason :: term()}. trace(Req) -> prg_utils:pipe( [ @@ -325,15 +327,7 @@ do_get(Req) -> do_trace(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId}) -> prg_storage:process_trace(StorageOpts, NsId, Id). - -do_put( - #{ - ns_opts := #{storage := StorageOpts}, - id := Id, - ns := NsId, - args := #{process := Process} = Args - } = Opts -) -> +do_put(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, args := #{process := Process} = Args} = Opts) -> #{ process_id := ProcessId } = Process, @@ -378,11 +372,14 @@ process_call(#{ns_opts := NsOpts, ns := NsId, type := Type, task := Task, worker TaskHeader = make_task_header(Type, Ref), ok = prg_worker:process_task(Worker, TaskHeader, Task), ok = prg_scheduler:release_worker(NsId, self(), Worker), + %% TODO Maybe refactor to span inside scheduler gen_server + _ = ?span_event(<<"release worker">>), %% see fun reply/2 receive {Ref, Result} -> Result after Timeout -> + _ = ?span_exception(throw, <<"timeout">>, []), {error, <<"timeout">>} end. diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index 6a2fb46..459a33b 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -8,6 +8,8 @@ end_per_suite/1, init_per_group/2, end_per_group/2, + init_per_testcase/2, + end_per_testcase/2, all/0, groups/0 ]). @@ -61,7 +63,9 @@ init_per_group(tasks_injection, C) -> Applications = [ {epg_connector, prg_ct_hook:app_env(epg_connector)}, {brod, prg_ct_hook:app_env(brod)}, - {progressor, UpdPrgConfig} + {progressor, UpdPrgConfig}, + {opentelemetry_exporter, []}, + {opentelemetry, [{span_processor, simple}]} ], _ = prg_ct_hook:start_applications(Applications), _ = prg_ct_hook:create_kafka_topics(), @@ -75,6 +79,23 @@ end_per_group(_, _) -> _ = prg_ct_hook:stop_applications(), ok. +init_per_testcase(Name, C) -> + Mod = ?MODULE, + SpanName = iolist_to_binary([atom_to_binary(Mod), ":", atom_to_binary(Name), "/1"]), + SpanCtx = otel_tracer:start_span(opentelemetry:get_application_tracer(Mod), SpanName, #{kind => internal}), + %% NOTE This also puts otel context to process dictionary + _ = otel_tracer:set_current_span(SpanCtx), + [{span_ctx, SpanCtx} | C]. + +end_per_testcase(_Name, C) -> + case lists:keyfind(span_ctx, 1, C) of + {span_ctx, SpanCtx} -> + _ = otel_span:end_span(SpanCtx), + ok; + _ -> + ok + end. + all() -> [ {group, base}, diff --git a/test/prg_ct_hook.erl b/test/prg_ct_hook.erl index eb3fd38..b458c67 100644 --- a/test/prg_ct_hook.erl +++ b/test/prg_ct_hook.erl @@ -53,7 +53,9 @@ app_list() -> [ {epg_connector, app_env(epg_connector)}, {brod, app_env(brod)}, - {progressor, app_env(progressor)} + {progressor, app_env(progressor)}, + {opentelemetry_exporter, []}, + {opentelemetry, [{span_processor, simple}]} ]. app_env(progressor) ->