Skip to content

[CELEBORN-2261] Parallelize reserve slots#3598

Open
eolivelli wants to merge 2 commits intoapache:mainfrom
eolivelli:CELEBORN-2261-apache
Open

[CELEBORN-2261] Parallelize reserve slots#3598
eolivelli wants to merge 2 commits intoapache:mainfrom
eolivelli:CELEBORN-2261-apache

Conversation

@eolivelli
Copy link
Contributor

@eolivelli eolivelli commented Feb 9, 2026

What changes were proposed in this pull request?

Add new configuration celeborn.worker.reserve.slots.io.threads (default 1) to set the number of threads to use to create the files when reserving the slots.

celeborn.worker.reserve.slots.io.threads = 0 means to use a number computed from the number of available cpus.
In order to have the same behavior as in 0.6.0 you can set celeborn.worker.reserve.slots.io.threads to 1.

Why are the changes needed?

When using S3 the createPartitionDataWriter method can be slow, because it creates files on S3. This is only IO and it can be parallelize to speed up the bootstrap of the shuffle operation.

Does this PR resolve a correctness bug?

No

Does this PR introduce any user-facing change?

Yes

It adds a new configuration celeborn.worker.reserve.slots.io.threads.
The default behavior is consistent with the previous version

How was this patch tested?

CI and Manual testing

@Dzeri96
Copy link

Dzeri96 commented Feb 9, 2026

I can can't really review this code, but I can give my two cents:

  1. Parallelizing so high in the call chain makes it hard to know if the called code will produce any race conditions. Since we are in the controller, I guess the method can be called asynchronously anyway, so you won't break anything that isn't already broken, but I think it would be better to encapsulate only the time-sensitive part of the code and return a Future from that function, rather than parallelize everything.

  2. Maybe factor out the duplicate code block that assigns primary and replica slots?

  3. I don't know enough to be sure, but it seems to me that this method should maybe queue up requests instead of executing them greedily? If the worker gets many handleReserveSlots requests, they might fight with each-other and in the end they could all fail? I guess the master should prevent this.

@SteNicholas SteNicholas changed the title CELEBORN-2261 Parallelize Reserve Slow in the Worker [CELEBORN-2261] Parallelize reserve slots Feb 10, 2026
@eolivelli
Copy link
Contributor Author

@Dzeri96

Parallelizing so high in the call chain makes it hard to know if the called code will produce any race conditions. Since we are in the controller, I guess the method can be called asynchronously anyway, so you won't break anything that isn't already broken, but I think it would be better to encapsulate only the time-sensitive part of the code and return a Future from that function, rather than parallelize everything.

In this case it is not a problem, we know the code that is running in the blocks and in this specific case it is only creating files with a predictable name (this is part of the PartitionLocation, see here)

I think it would be better to encapsulate only the time-sensitive part of the code and return a Future from that function, rather than parallelize everything.

This is basically what we are doing now::

  • Creating files is very fast for MEMORY and DISK, and the file name is predicatable. Parallelizing is not a problem.
  • For S3 (and I expect this for all the other DFSs) creating the file is very slow, and it is only about waiting for response from S3. Sending N requests in parallel does not add more work on the local CPU

but it seems to me that this method should maybe queue up requests instead of executing them greedily? If the worker gets many handleReserveSlots requests

This is actually a possible problem and this patch is mitigating that, because it will take less time to complete the handleReserveSlot request.

Ideally I would like to make handleReserveSlot fully async and offload it from the Netty eventloop, but this can be done in a separate work.
In any case we should not loop over N PartitionLocations and wait for each creation

@eolivelli
Copy link
Contributor Author

Maybe factor out the duplicate code block that assigns primary and replica slots?

This is generally a problem in some classes in this code base, I don't want to add unrelated changes

@eolivelli
Copy link
Contributor Author

Unfortunately my code does not build with Scala 2.13, let me solve the problem

[INFO] compiling 25 Scala sources and 42 Java sources to /home/runner/work/celeborn/celeborn/worker/target/classes ...
Error:  /home/runner/work/celeborn/celeborn/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:235: type mismatch;
 found   : Nothing => Nothing
 required: scala.collection.BuildFrom[IndexedSeq[scala.concurrent.Future[org.apache.celeborn.common.protocol.PartitionLocation]],org.apache.celeborn.common.protocol.PartitionLocation,?]
Error:  /home/runner/work/celeborn/celeborn/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:237: value asJava is not a member of Any
Error:  /home/runner/work/celeborn/celeborn/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:239: type mismatch;
 found   : Any
 required: Throwable
Error:  /home/runner/work/celeborn/celeborn/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:284: type mismatch;
 found   : Nothing => Nothing
 required: scala.collection.BuildFrom[IndexedSeq[scala.concurrent.Future[org.apache.celeborn.common.protocol.PartitionLocation]],org.apache.celeborn.common.protocol.PartitionLocation,?]
Error:  /home/runner/work/celeborn/celeborn/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:286: value asJava is not a member of Any
Error:  /home/runner/work/celeborn/celeborn/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:288: type mismatch;
 found   : Any
 required: Throwable
Error: [ERROR] 6 errors found

@eolivelli
Copy link
Contributor Author

I still have some problem with making it build with both Scala 2.12 and Scala 2.13

@eolivelli
Copy link
Contributor Author

@Dzeri96 in the end I made all asynchronous, the thread won't block anymore to wait for the results

Motivation:
When using S3 the createPartitionDataWriter method can be slow, because it creates files on S3.
This is only IO and it can be parallelize to speed up the bootstrap of the shuffle operation.

Changes:
Add new configuration celeborn.worker.reserve.slots.io.threads (default 1) to set the
number of threads to use to create the files when reserving the slots.

Zero means to use a number computed from the number of available cpus.
@eolivelli
Copy link
Contributor Author

I have solved all the compilation issues

@eolivelli
Copy link
Contributor Author

@pan3793 @SteNicholas could you please take a look ?
This patch is necessary to get good performance when using S3 and Spark job has some small tasks that create lot of slots

Copy link
Member

@pan3793 pan3793 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not against adding this feature, leave some general comments. @waitinfuture do you have concerns in this direction?

.createWithDefault(2)

val RESERVE_SLOTS_IO_THREAD_POOL_SIZE: ConfigEntry[Int] =
buildConf("celeborn.worker.reserve.slots.io.threads")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dot for namespaces, camelCase for words

Suggested change
buildConf("celeborn.worker.reserve.slots.io.threads")
buildConf("celeborn.worker.reserveSlots.io.threads")

shutdown = worker.shutdown

reserveSlotsThreadPool =
Executors.newFixedThreadPool(conf.workerReserveSlotsIoThreadPoolSize).asInstanceOf[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use Celeborn's util to create a thread pool instead of raw juc classes, to properly set no daemon, name prefix, exception handler, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use ThreadUtils.sameThreadExecutionContext for threads = 1.

when something goes wrong, we will lost full stacktrace if we run the task in another thread.

} else {
val timeToReserveLocations = System.currentTimeMillis() - startTime;
logInfo(
s"Reserved ${tasks.size} slots for $shuffleKey in $timeToReserveLocations ms (with ${conf.workerReserveSlotsIoThreadPoolSize} threads)")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calculation of ${conf.workerReserveSlotsIoThreadPoolSize} is not free, especially on the hot path, so materialize it to save the evaluation cost every time

@SteNicholas
Copy link
Member

@eolivelli, thanks for contribution. I have create the similar pull request for parallelizing reserve slots in #3387. You'd better to take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants