[CELEBORN-2252] Skip writing data from other attempts#3590
[CELEBORN-2252] Skip writing data from other attempts#3590taowenjun wants to merge 2 commits intoapache:mainfrom
Conversation
…e map attempts is completed
There was a problem hiding this comment.
Pull request overview
This PR adds logic to skip writing data from map task attempts that are not the completed attempt. When a map task completes and commits, the attempt ID is stored in shuffleMapperAttempts. If subsequent push data requests arrive from other attempts of the same map task, they are now rejected rather than written to disk.
Changes:
- Added attempt validation logic in the
writeDatanested function to check if incoming data is from the completed map attempt - Data from non-completed attempts now calls
decrementPendingWrites()instead offileWriter.write(body)and logs an info message
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| val (endedAttempt, toWrite, curMapId, curMapAttempt) = | ||
| if (shuffleMapperAttempts.containsKey(shuffleKey)) { | ||
| val (mapId, attemptId) = getMapAttempt(body) | ||
| val endedAttemptId = shuffleMapperAttempts.get(shuffleKey).get(mapId) |
There was a problem hiding this comment.
Potential ArrayIndexOutOfBoundsException when accessing shuffleMapperAttempts. If mapId is greater than or equal to the length of the AtomicIntegerArray, this will throw an exception. Consider adding a bounds check before accessing the array, similar to how other parts of the codebase handle this.
| val toWriteAttempt = attemptId == endedAttemptId | ||
| (endedAttemptId, toWriteAttempt, mapId, attemptId) | ||
| } else (-1, true, -1, -1) | ||
| if (endedAttempt == -1 || toWrite) { |
There was a problem hiding this comment.
The logic for determining whether to write has a subtle issue. When endedAttemptId is -1 (meaning the map task has not completed yet), the check 'attemptId == endedAttemptId' will only be true if attemptId is also -1, which is unlikely to be a valid scenario. The condition on line 1532 'endedAttempt == -1 || toWrite' correctly handles this by allowing writes when endedAttempt is -1, but the logic could be clearer. Consider restructuring to: if endedAttemptId is -1, always write; otherwise only write if attemptId matches endedAttemptId.
| val toWriteAttempt = attemptId == endedAttemptId | |
| (endedAttemptId, toWriteAttempt, mapId, attemptId) | |
| } else (-1, true, -1, -1) | |
| if (endedAttempt == -1 || toWrite) { | |
| val toWriteAttempt = | |
| if (endedAttemptId == -1) { | |
| // Map task has not completed yet, always write | |
| true | |
| } else { | |
| // Only write if the current attempt matches the ended attempt | |
| attemptId == endedAttemptId | |
| } | |
| (endedAttemptId, toWriteAttempt, mapId, attemptId) | |
| } else (-1, true, -1, -1) | |
| if (toWrite) { |
| fileWriter.write(body) | ||
| } else { | ||
| fileWriter.decrementPendingWrites() | ||
| logInfo(s"Shuffle $shuffleKey is committing, map $curMapId, ignore attemptId $curMapAttempt data and endedAttempt $endedAttempt") |
There was a problem hiding this comment.
The log message could be more clear about what is happening. Currently it says "Shuffle $shuffleKey is committing" but at this point the shuffle has already been committed (the map task ended). Consider rephrasing to something like "Skipping data from map $curMapId attempt $curMapAttempt because map already completed with attempt $endedAttempt" for better clarity.
| logInfo(s"Shuffle $shuffleKey is committing, map $curMapId, ignore attemptId $curMapAttempt data and endedAttempt $endedAttempt") | |
| logInfo( | |
| s"Skipping data from map $curMapId attempt $curMapAttempt for shuffle $shuffleKey " + | |
| s"because map already completed with attempt $endedAttempt") |
| val (endedAttempt, toWrite, curMapId, curMapAttempt) = | ||
| if (shuffleMapperAttempts.containsKey(shuffleKey)) { | ||
| val (mapId, attemptId) = getMapAttempt(body) | ||
| val endedAttemptId = shuffleMapperAttempts.get(shuffleKey).get(mapId) | ||
| val toWriteAttempt = attemptId == endedAttemptId | ||
| (endedAttemptId, toWriteAttempt, mapId, attemptId) | ||
| } else (-1, true, -1, -1) | ||
| if (endedAttempt == -1 || toWrite) { | ||
| fileWriter.write(body) | ||
| } else { | ||
| fileWriter.decrementPendingWrites() | ||
| logInfo(s"Shuffle $shuffleKey is committing, map $curMapId, ignore attemptId $curMapAttempt data and endedAttempt $endedAttempt") | ||
| } |
There was a problem hiding this comment.
The new logic for skipping data from non-ended map attempts lacks test coverage. Consider adding unit tests to verify that: 1) data from the ended attempt is written successfully, 2) data from other attempts is skipped and decrementPendingWrites is called, 3) edge cases like mapId out of bounds are handled gracefully, and 4) the behavior when shuffleMapperAttempts doesn't contain the shuffle key.
…e map attempts is completed
|
@taowenjun, please fix the failure of CI. |
| index: Int): Unit = { | ||
| try { | ||
| fileWriter.write(body) | ||
| val (endedAttempt, toWrite, curMapId, curMapAttempt) = |
There was a problem hiding this comment.
Celeborn already check that otherMapAttempt is completed before executing writeData to avoid unnecessary writes. Why is it necessary to perform an additional check within writeData to skip data writing?
There was a problem hiding this comment.
As communicated, I have added logs to prove it.
There was a problem hiding this comment.
Thanks, you're correct. Previously, the mapper end check was performed only when the partition location was null during the handling of PushData/PushMergeData. I believe we can enhance this by extending the check to the mapper level, regardless of whether the partition location is null.
What changes were proposed in this pull request?
Skip writing data from other attempts after one of the map attempts is completed.
Why are the changes needed?
Does this PR resolve a correctness bug?
Does this PR introduce any user-facing change?
How was this patch tested?