-
Notifications
You must be signed in to change notification settings - Fork 49
Open
Description
Config
application:set_env(ekaf, ekaf_partition_strategy, [{ekaf_partition_strategy, custom}])
All messages are produced with a key.
What happens:
- ekaf_server.erl:439
handle_info({worker, up, WorkerUp, WorkerUpStateName, WorkerUpState, _}, StateName, #ekaf_server { topic = Topic, messages = OfflineMessages } = State) - ekaf_server_lib.erl:118
send_messages(StateName, #ekaf_server{ topic = Topic } = State, Messages) - ekaf.erl:66
produce_async_batched(Topic, Data) - ekaf_lib.erl:79
common_async(Event, Topic, [{Key,Data}|Rest])
The code waits for a reply from the TopicWorker, but the current process is the topic worker. The code would deadlock if we used gen_fsm:sync_send_all_state_event as in #50 . Here instead, we send {pick, {Key,Data}, self()} to self(), immediately receive it in clause _E and the message is discarded.
Metadata
Metadata
Assignees
Labels
No labels