[CELEBORN-2257] Fix remote disks not being reported on registration#3597
[CELEBORN-2257] Fix remote disks not being reported on registration#3597Dzeri96 wants to merge 2 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Fixes worker registration disk reporting so the master can see remote storage (HDFS/S3/OSS) immediately (before the first heartbeat), and refactors disk snapshot APIs / slot-allocation logic to distinguish local vs remote disks more clearly.
Changes:
- Renamed disk snapshot / healthy-dir helpers to explicitly mean “local” and added an “all disks” snapshot.
- Updated worker registration/heartbeat disk reporting to incorporate remote disks.
- Simplified master slot-allocation filtering by embedding disk-type metadata into
StorageInfo.Typeand using it in allocation logic.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala | Updates mocks to the renamed localDisksSnapshot() API. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala | Introduces localDisksSnapshot() / allDisksSnapshot() and renames “healthy working dirs” to local-only. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala | Switches registration to report all disks; refactors heartbeat disk update flow. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala | Uses local-only healthy working dirs check for slot reservation. |
| tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala | Updates test to use localDisksSnapshot(). |
| master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java | Simplifies disk filtering using StorageInfo.Type metadata; refactors usable-slot bookkeeping. |
| common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala | Refactors slot recomputation / propagation logic and uses isDFS. |
| common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java | Adds isDFS + mask metadata into StorageInfo.Type and introduces isAvailable(...). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
Show resolved
Hide resolved
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
Outdated
Show resolved
Hide resolved
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
Show resolved
Hide resolved
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
Outdated
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3597 +/- ##
==========================================
- Coverage 67.13% 67.07% -0.06%
==========================================
Files 357 357
Lines 21860 21935 +75
Branches 1943 1947 +4
==========================================
+ Hits 14674 14711 +37
- Misses 6166 6213 +47
+ Partials 1020 1011 -9 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
Show resolved
Hide resolved
| private val diskInfos = storageManager | ||
| .allDisksSnapshot() | ||
| .map { diskInfo => diskInfo.mountPoint -> diskInfo } | ||
| .toMap.asJava |
There was a problem hiding this comment.
This PR changes worker registration/heartbeat disk reporting to include remote disks (allDisksSnapshot) and introduces new slot-availability semantics (StorageInfo.isAvailable, Type.isDFS). There doesn’t appear to be a test asserting that remote disk infos are (a) included in the initial registration payload and (b) preserved across subsequent heartbeats so the master can allocate slots from them before/without the first heartbeat. Adding a focused unit/integration test around worker->master disk info propagation would help prevent regressions here.
There was a problem hiding this comment.
This is what I wrote in the original PR. I need someone from the existing community to guide me on writing an integration test.
SteNicholas
left a comment
There was a problem hiding this comment.
@Dzeri96, thanks for contribution. Could you explain which fix mainly provided in this pull request?
| Type(int value) { | ||
| Type(int value, boolean isDFS, int mask) { | ||
| this.value = value; | ||
| this.isDFS = isDFS; |
There was a problem hiding this comment.
IMO, it's unnecessary to add isDFS variable. The isDFS method is enough for usage.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (estimatedPartitionSize.nonEmpty && !newDisk.storageType.isDFS) { | ||
| newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get | ||
| newDisk.availableSlots = newDisk.actualUsableSpace / estimatedPartitionSize.get | ||
| } |
There was a problem hiding this comment.
This change introduces new slot-update semantics for DFS vs local disks inside updateThenGetDiskInfos, but there are no unit tests covering that remote/DFS disk slot fields are preserved across successive updates (e.g., registration updateDiskSlots(...) followed by heartbeat updateThenGetDiskInfos(...)). Adding a focused test in WorkerInfoSuite for a DFS DiskInfo would help prevent regressions like remote disks becoming unavailable after the first heartbeat.
What changes were proposed in this pull request?
Why are the changes needed?
Does this PR resolve a correctness bug?
Yes
Does this PR introduce any user-facing change?
No
How was this patch tested?
Important: I want help from the community on how to write tests for this.