Skip to content

Commit ed6eb04

Browse files
authored
improve participant loop for cpu (#45)
* exp: improve IPC * use futures concurrency grouping * update with more tunings * reduce shard count * set to 8x shards * tune shards * improve cpu cache by egress batching better * revert to tokio spawn
1 parent 9af2cb9 commit ed6eb04

File tree

11 files changed

+77
-49
lines changed

11 files changed

+77
-49
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ SCCACHE := $(shell which sccache)
55
# >> force-frame-pointers is required for profiling, add ~1% CPU overhead
66
# >> allow-multiple-definition is required so we can replace system allocator for C projects, like openssl, ffmpeg, etc.
77
# These CFLAGS are needed to make sure everything compiles with frame-pointer otherwise segfault will happen.
8-
export RUSTFLAGS := $(RUSTFLAGS) -C link-arg=-Wl,--allow-multiple-definition -C force-frame-pointers=yes
8+
export RUSTFLAGS := $(RUSTFLAGS) -C link-arg=-Wl,--allow-multiple-definition -C force-frame-pointers=yes --cfg tracing_unstable --cfg tokio_unstable
99
export CFLAGS := $(CFLAGS) -fno-omit-frame-pointer -mno-omit-leaf-frame-pointer
1010
export CXXFLAGS := $(CXXFLAGS) -fno-omit-frame-pointer -mno-omit-leaf-frame-pointer
1111
# ===

pulsebeam-runtime/src/sync/spmc.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ impl<T: Clone> Receiver<T> {
116116

117117
// Closed and nothing left
118118
if self.ring.closed.load(Ordering::Acquire) == 1 && self.next_seq >= head {
119-
coop.made_progress();
120119
return Poll::Ready(Err(RecvError::Closed));
121120
}
122121

@@ -146,32 +145,29 @@ impl<T: Clone> Receiver<T> {
146145
if slot_seq < earliest {
147146
drop(slot);
148147
self.next_seq = head;
149-
coop.made_progress();
150148
return Poll::Ready(Err(RecvError::Lagged(head)));
151149
}
152150

153151
// Seq mismatch — producer overwrote after head snapshot
154152
if slot_seq != self.next_seq {
155153
drop(slot);
156154
self.next_seq = head;
157-
coop.made_progress();
158155
return Poll::Ready(Err(RecvError::Lagged(head)));
159156
}
160157

161158
// Valid message
162159
if let Some(v) = &slot.val {
163160
let out = v.clone();
164161
drop(slot);
165-
self.next_seq += 1;
166162
coop.made_progress();
163+
self.next_seq += 1;
167164
return Poll::Ready(Ok(out));
168165
}
169166

170167
// This shouldn't never happen, but just in case..
171168
// Seq was correct but value missing — treat as lag
172169
drop(slot);
173170
self.next_seq = head;
174-
coop.made_progress();
175171
return Poll::Ready(Err(RecvError::Lagged(head)));
176172
}
177173
}

pulsebeam/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ tokio-metrics = { version = "0.4.5", features = ["metrics-rs-integration"] }
3939
once_cell = "1.21.3"
4040
ahash = "0.8.12"
4141
tokio-stream = { version = "0.1.17", features = ["sync"] }
42+
futures-concurrency = "7.6.3"
4243

4344
[target.'cfg(not(target_env = "msvc"))'.dependencies]
4445
tikv-jemallocator = { version = "0.6.0", features = ["profiling", "unprefixed_malloc_on_supported_platforms", "disable_initial_exec_tls"] }

pulsebeam/src/gateway/actor.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::{entity::ParticipantId, gateway::demux::Demuxer};
2-
use futures_lite::StreamExt;
32
use pulsebeam_runtime::actor::{ActorKind, ActorStatus, RunnerConfig};
43
use pulsebeam_runtime::prelude::*;
54
use pulsebeam_runtime::{actor, mailbox, net};
@@ -125,7 +124,7 @@ impl actor::Actor<GatewayMessageSet> for GatewayWorkerActor {
125124
) -> Result<(), actor::ActorError> {
126125
pulsebeam_runtime::actor_loop!(self, ctx, pre_select: {},
127126
select: {
128-
Ok(_) = self.socket.readable() => {
127+
_ = self.socket.readable() => {
129128
self.read_socket().await;
130129
}
131130
});
@@ -165,34 +164,39 @@ impl GatewayWorkerActor {
165164
}
166165

167166
async fn read_socket(&mut self) -> io::Result<()> {
168-
let mut budget: usize = 256;
169-
while budget > 0 {
167+
const COOP_BUDGET: usize = 128;
168+
let mut spent_budget: usize = 0;
169+
170+
loop {
170171
self.recv_batches.clear();
172+
171173
match self
172174
.socket
173175
.try_recv_batch(&mut self.batcher, &mut self.recv_batches)
174176
{
175177
Ok(_) => {
176178
for batch in self.recv_batches.drain(..) {
177-
let count = if batch.stride > 0 {
179+
// Calculate logical packets (GRO awareness)
180+
let cost = if batch.stride > 0 {
178181
std::cmp::max(1, batch.len / batch.stride)
179182
} else {
180183
1
181184
};
182-
budget = budget.saturating_sub(count);
185+
183186
self.demuxer.demux(batch).await;
187+
188+
spent_budget += cost;
189+
190+
if spent_budget >= COOP_BUDGET {
191+
tokio::task::yield_now().await;
192+
spent_budget = 0;
193+
}
184194
}
185195
}
186-
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
187-
// Socket empty, wait until ready again.
188-
return Ok(());
189-
}
196+
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(()),
190197
Err(err) => return Err(err),
191198
}
192199
}
193-
194-
tokio::task::yield_now().await;
195-
Ok(())
196200
}
197201
}
198202

pulsebeam/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ fn main() {
3333

3434
let rt = tokio::runtime::Builder::new_multi_thread()
3535
.enable_all()
36+
// .disable_lifo_slot()
3637
.worker_threads(workers)
3738
.build()
3839
.unwrap();

pulsebeam/src/node.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,22 +84,22 @@ pub async fn run(
8484
);
8585
join_set.spawn(ignore(gateway_task));
8686

87-
let shard_count = 2 * workers;
88-
let mut shard_handles = Vec::with_capacity(shard_count);
89-
90-
for i in 0..shard_count {
91-
let (shard, shard_task) =
92-
actor::prepare(shard::ShardActor::new(i), RunnerConfig::default());
93-
join_set.spawn(ignore(shard_task));
94-
shard_handles.push(shard);
95-
}
87+
// let shard_count = 2 * workers;
88+
// let mut shard_handles = Vec::with_capacity(shard_count);
89+
//
90+
// for i in 0..shard_count {
91+
// let (shard, shard_task) =
92+
// actor::prepare(shard::ShardActor::new(i), RunnerConfig::default());
93+
// join_set.spawn(ignore(shard_task));
94+
// shard_handles.push(shard);
95+
// }
9696

9797
let rng = rand::Rng::from_os_rng();
9898
let node_ctx = NodeContext {
9999
rng,
100100
gateway,
101101
sockets,
102-
shards: shard_handles,
102+
shards: vec![],
103103
egress_counter: Arc::new(AtomicUsize::new(0)),
104104
};
105105

pulsebeam/src/participant/actor.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ impl actor::Actor<ParticipantMessageSet> for ParticipantActor {
102102
}
103103

104104
tokio::select! {
105+
biased;
106+
// Priority 1: Control Messages
105107
res = ctx.sys_rx.recv() => {
106108
match res {
107109
Some(msg) => match msg {
@@ -114,21 +116,31 @@ impl actor::Actor<ParticipantMessageSet> for ParticipantActor {
114116
}
115117
}
116118
Some(msg) = ctx.rx.recv() => self.handle_control_message(msg).await,
117-
Ok(_) = self.egress.writable(), if !self.core.batcher.is_empty() => {
118-
self.core.batcher.flush(&self.egress);
119-
},
119+
120+
// Priority 2: CPU Work
120121
// TODO: consolidate pollings in core
121122
Some((_, req)) = self.core.upstream.keyframe_request_streams.next() => {
122123
self.core.handle_keyframe_request(req);
123124
}
124125
Some((meta, pkt)) = self.core.downstream.next() => {
125126
self.core.handle_forward_rtp(meta, pkt);
126-
// TODO: add batching back
127+
// this indicates the first batch is filled.
128+
if self.core.batcher.len() >= 2 {
129+
self.core.batcher.flush(&self.egress);
130+
}
131+
127132
new_deadline = self.core.poll_rtc();
128133
},
129134
Some(batch) = gateway_rx.recv() => {
130135
new_deadline = self.core.handle_udp_packet_batch(batch);
131136
},
137+
138+
// Priority 3: Flush to network
139+
Ok(_) = self.egress.writable(), if !self.core.batcher.is_empty() => {
140+
self.core.batcher.flush(&self.egress);
141+
},
142+
143+
// Priority 4: Background tasks
132144
now = stats_interval.tick() => {
133145
self.core.poll_stats(now);
134146
}

pulsebeam/src/participant/batcher.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ impl Batcher {
6969
self.free_states.push(state);
7070
}
7171

72+
pub fn len(&self) -> usize {
73+
self.active_states.len()
74+
}
75+
7276
pub fn flush(&mut self, socket: &net::UnifiedSocket) {
7377
while let Some(state) = self.front() {
7478
let res = socket.try_send_batch(&net::SendPacketBatch {

pulsebeam/src/room.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use std::{collections::HashMap, sync::Arc, time::Duration};
22

33
use pulsebeam_runtime::{
4-
actor::{ActorKind, RunnerConfig},
4+
actor::{ActorKind, ActorStatus, RunnerConfig},
55
prelude::*,
66
};
77
use str0m::Rtc;
8+
use tokio::task::JoinSet;
89

910
use crate::{
1011
entity::{ParticipantId, RoomId, TrackId},
@@ -48,6 +49,8 @@ pub struct RoomActor {
4849
// participant_factory: Box<dyn actor::ActorFactory<participant::ParticipantActor>>,
4950
room_id: Arc<RoomId>,
5051
state: RoomState,
52+
53+
participant_tasks: JoinSet<(Arc<ParticipantId>, ActorStatus)>,
5154
}
5255

5356
#[derive(Default, Clone, Debug)]
@@ -80,6 +83,9 @@ impl actor::Actor<RoomMessageSet> for RoomActor {
8083
) -> Result<(), actor::ActorError> {
8184
pulsebeam_runtime::actor_loop!(self, ctx, pre_select: {},
8285
select: {
86+
Some(Ok((participant_id, _))) = self.participant_tasks.join_next() => {
87+
self.handle_participant_left(participant_id).await;
88+
}
8389
_ = tokio::time::sleep(EMPTY_ROOM_TIMEOUT), if self.state.participants.is_empty() => {
8490
tracing::info!("room has been empty for: {EMPTY_ROOM_TIMEOUT:?}, exiting.");
8591
break;
@@ -120,6 +126,7 @@ impl RoomActor {
120126
node_ctx,
121127
room_id,
122128
state: RoomState::default(),
129+
participant_tasks: JoinSet::new(),
123130
}
124131
}
125132

@@ -156,15 +163,16 @@ impl RoomActor {
156163

157164
let (mut participant_handle, participant_task) =
158165
actor::prepare(participant_actor, RunnerConfig::default());
159-
let mut room_handle = ctx.handle.clone();
160-
let participant_task = async move {
161-
let (participant_id, _) = participant_task.await;
162-
room_handle
163-
.send(RoomMessage::RemoveParticipant(participant_id))
164-
.await;
165-
};
166166

167-
self.schedule(participant_task.boxed()).await;
167+
self.participant_tasks.spawn(participant_task);
168+
169+
// let participant_task = async move {
170+
// let (participant_id, _) = participant_task.await;
171+
// room_handle
172+
// .send(RoomMessage::RemoveParticipant(participant_id))
173+
// .await;
174+
// };
175+
// self.schedule(participant_task.boxed()).await;
168176
self.state.participants.insert(
169177
participant_id.clone(),
170178
ParticipantMeta {

0 commit comments

Comments
 (0)