diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index 4cbff786650..c3ce1d39671 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -1522,7 +1522,31 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler shuffleKey: String, index: Int): Unit = { try { - fileWriter.write(body) + val (endedAttempt, toWrite, curMapId, curMapAttempt) = + if (shuffleMapperAttempts.containsKey(shuffleKey)) { + val (mapId, attemptId) = getMapAttempt(body) + if (mapId >= shuffleMapperAttempts.get(shuffleKey).length()) { + throw new ArrayIndexOutOfBoundsException(s"MapId $mapId of shuffleKey " + + s"$shuffleKey out of array index bounds when accessing shuffleMapperAttempts.") + } + val endedAttemptId = shuffleMapperAttempts.get(shuffleKey).get(mapId) + val toWriteAttempt = + if (endedAttemptId == -1) { + // Map task has not completed yet, always write + true + } else { + // Only write if the current attempt matches the ended attempt + attemptId == endedAttemptId + } + (endedAttemptId, toWriteAttempt, mapId, attemptId) + } else (-1, true, -1, -1) + if (toWrite) { + fileWriter.write(body) + } else { + fileWriter.decrementPendingWrites() + logInfo( + s"Skipping data from map $curMapId attempt $curMapAttempt for shuffle $shuffleKey " + + s"because map already completed with attempt $endedAttempt") } result(index) = StatusCode.SUCCESS } catch { case e: Throwable =>