[CELEBORN-2261] Parallelize reserve slots#3598
[CELEBORN-2261] Parallelize reserve slots#3598eolivelli wants to merge 2 commits intoapache:mainfrom
Conversation
9226b38 to
ac155e3
Compare
|
I can can't really review this code, but I can give my two cents:
|
In this case it is not a problem, we know the code that is running in the blocks and in this specific case it is only creating files with a predictable name (this is part of the PartitionLocation, see here)
This is basically what we are doing now::
This is actually a possible problem and this patch is mitigating that, because it will take less time to complete the handleReserveSlot request. Ideally I would like to make handleReserveSlot fully async and offload it from the Netty eventloop, but this can be done in a separate work. |
This is generally a problem in some classes in this code base, I don't want to add unrelated changes |
|
Unfortunately my code does not build with Scala 2.13, let me solve the problem |
|
I still have some problem with making it build with both Scala 2.12 and Scala 2.13 |
|
@Dzeri96 in the end I made all asynchronous, the thread won't block anymore to wait for the results |
Motivation: When using S3 the createPartitionDataWriter method can be slow, because it creates files on S3. This is only IO and it can be parallelize to speed up the bootstrap of the shuffle operation. Changes: Add new configuration celeborn.worker.reserve.slots.io.threads (default 1) to set the number of threads to use to create the files when reserving the slots. Zero means to use a number computed from the number of available cpus.
ed3f818 to
55510ef
Compare
|
I have solved all the compilation issues |
|
@pan3793 @SteNicholas could you please take a look ? |
pan3793
left a comment
There was a problem hiding this comment.
I'm not against adding this feature, leave some general comments. @waitinfuture do you have concerns in this direction?
| .createWithDefault(2) | ||
|
|
||
| val RESERVE_SLOTS_IO_THREAD_POOL_SIZE: ConfigEntry[Int] = | ||
| buildConf("celeborn.worker.reserve.slots.io.threads") |
There was a problem hiding this comment.
dot for namespaces, camelCase for words
| buildConf("celeborn.worker.reserve.slots.io.threads") | |
| buildConf("celeborn.worker.reserveSlots.io.threads") |
| shutdown = worker.shutdown | ||
|
|
||
| reserveSlotsThreadPool = | ||
| Executors.newFixedThreadPool(conf.workerReserveSlotsIoThreadPoolSize).asInstanceOf[ |
There was a problem hiding this comment.
use Celeborn's util to create a thread pool instead of raw juc classes, to properly set no daemon, name prefix, exception handler, etc.
There was a problem hiding this comment.
maybe use ThreadUtils.sameThreadExecutionContext for threads = 1.
when something goes wrong, we will lost full stacktrace if we run the task in another thread.
| } else { | ||
| val timeToReserveLocations = System.currentTimeMillis() - startTime; | ||
| logInfo( | ||
| s"Reserved ${tasks.size} slots for $shuffleKey in $timeToReserveLocations ms (with ${conf.workerReserveSlotsIoThreadPoolSize} threads)") |
There was a problem hiding this comment.
calculation of ${conf.workerReserveSlotsIoThreadPoolSize} is not free, especially on the hot path, so materialize it to save the evaluation cost every time
|
@eolivelli, thanks for contribution. I have create the similar pull request for parallelizing reserve slots in #3387. You'd better to take a look. |
What changes were proposed in this pull request?
Add new configuration
celeborn.worker.reserve.slots.io.threads(default 1) to set the number of threads to use to create the files when reserving the slots.celeborn.worker.reserve.slots.io.threads=0means to use a number computed from the number of available cpus.In order to have the same behavior as in 0.6.0 you can set
celeborn.worker.reserve.slots.io.threadsto 1.Why are the changes needed?
When using S3 the createPartitionDataWriter method can be slow, because it creates files on S3. This is only IO and it can be parallelize to speed up the bootstrap of the shuffle operation.
Does this PR resolve a correctness bug?
No
Does this PR introduce any user-facing change?
Yes
It adds a new configuration
celeborn.worker.reserve.slots.io.threads.The default behavior is consistent with the previous version
How was this patch tested?
CI and Manual testing