A custom async executor for Rust with CPU affinity and NUMA awareness support, designed for CPU-heavy workloads.
- Thread Pool Executor: Manages a pool of thread-affine executors, one per core
- CPU Affinity: Pin tasks to specific CPU cores for better cache locality
- NUMA Awareness: Detect and utilize NUMA topology for optimal memory access patterns
- Bidirectional Channels: Custom channel implementations (Bichannel, Hookshot)
- Task Cancellation: Graceful shutdown support with TaskWithShutdown
This executor is designed to efficiently handle CPU-intensive async tasks by:
- Pinning threads to specific CPU cores
- Allowing explicit task placement on specific cores or NUMA nodes
- Supporting round-robin task distribution (globally or per NUMA node)
- Minimizing cross-NUMA memory access for better performance
use core_executor::ThreadPoolExecutor;
// Create an executor with 8 cores
let mut executor = ThreadPoolExecutor::new(8);
// Spawn a task on a specific core
let future = executor.spawn_on_core(0, async {
// CPU-intensive work here
42
});
// Spawn a task on any available core (round-robin)
let future = executor.spawn_on_any(async {
// Work that can run on any core
"result"
});
// Block and wait for results
let result = futures_lite::future::block_on(future);use core_executor::ThreadPoolExecutor;
// Create a NUMA-aware executor (automatically detects topology)
let mut executor = ThreadPoolExecutor::new_numa_aware()?;
// Spawn a task on a specific NUMA node
let future = executor.spawn_on_numa_node(0, async {
// Work that benefits from NUMA locality
compute_intensive_task()
})?;
// Check NUMA topology
if let Some(topology) = executor.numa_topology() {
println!("System has {} NUMA nodes", topology.num_nodes());
println!("Total cores: {}", topology.total_cores);
}use core_executor::ThreadAffineSpawner;
// Spawn a long-running task with graceful shutdown
spawner.spawn_with_shutdown(|shutdown| async move {
loop {
// Do work...
// Check if shutdown was requested
if shutdown.should_exit() {
break;
}
// Continue processing...
smol::Timer::after(Duration::from_millis(100)).await;
}
});use core_executor::channel::Bichannel;
// Create a bounded bidirectional channel
let (left, right): (Bichannel<String, i32>, Bichannel<i32, String>) =
Bichannel::bounded(10);
// Send from left to right
left.send("Hello".to_string()).await?;
let msg = right.recv().await?;
// Send back from right to left
right.send(42).await?;
let reply = left.recv().await?;use core_executor::channel::Hookshot;
// Create a one-shot bidirectional channel
let (sender, receiver) = Hookshot::<String, i32>::new();
// Send a single message
sender.send_blocking("one-time message".to_string())?;
let msg = receiver.recv().await?;
// Send reply back
receiver.send_back(42).await?;
let reply = sender.recv_back().await?;new(cores: usize)- Create executor with specified number of coresnew_numa_aware()- Create NUMA-aware executor with auto-detected topologyspawn_on_core(core_id, task)- Spawn task on specific CPU corespawn_on_any(task)- Spawn task on any available core (round-robin)spawn_on_numa_node(node_id, task)- Spawn task on specific NUMA nodenuma_topology()- Get NUMA topology information if available
spawn(task)- Spawn a task and get a future for its resultfire(task)- Fire-and-forget task executionspawn_with_shutdown(task_fn)- Spawn with graceful shutdown support
ExecutorError- Main error type for executor operationsBichannelError- Errors from Bichannel operationsHookshotError- Errors from Hookshot operations
async-executor: Local executor for each threadcore_affinity: CPU affinity controlhwlocality: NUMA topology detection and managementasync-channel,async-oneshot: Channel primitiveswat-cpu: Helper for getting current CPU ID
This project is dual-licensed under MIT OR Apache-2.0.