From 03274f8c009181d0a7f5013b2d863879f168f66a Mon Sep 17 00:00:00 2001 From: taowenjun <1483633867@qq.com> Date: Thu, 22 Jan 2026 00:59:22 +0800 Subject: [PATCH 1/2] [CELEBORN-2252] Skip writing data from other attempts after one of the map attempts is completed --- .../service/deploy/worker/PushDataHandler.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index 4cbff786650..05b744adc95 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -1522,7 +1522,19 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler shuffleKey: String, index: Int): Unit = { try { - fileWriter.write(body) + val (endedAttempt, toWrite, curMapId, curMapAttempt) = + if (shuffleMapperAttempts.containsKey(shuffleKey)) { + val (mapId, attemptId) = getMapAttempt(body) + val endedAttemptId = shuffleMapperAttempts.get(shuffleKey).get(mapId) + val toWriteAttempt = attemptId == endedAttemptId + (endedAttemptId, toWriteAttempt, mapId, attemptId) + } else (-1, true, -1, -1) + if (endedAttempt == -1 || toWrite) { + fileWriter.write(body) + } else { + fileWriter.decrementPendingWrites() + logInfo(s"Shuffle $shuffleKey is committing, map $curMapId, ignore attemptId $curMapAttempt data and endedAttempt $endedAttempt") + } result(index) = StatusCode.SUCCESS } catch { case e: Throwable => From 28b9e8a44601841ee0de5f498a0a6e4b35146197 Mon Sep 17 00:00:00 2001 From: taowenjun <1483633867@qq.com> Date: Sun, 25 Jan 2026 23:09:18 +0800 Subject: [PATCH 2/2] [CELEBORN-2252] Skip writing data from other attempts after one of the map attempts is completed --- .../deploy/worker/PushDataHandler.scala | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index 05b744adc95..c3ce1d39671 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -1525,16 +1525,28 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler val (endedAttempt, toWrite, curMapId, curMapAttempt) = if (shuffleMapperAttempts.containsKey(shuffleKey)) { val (mapId, attemptId) = getMapAttempt(body) + if (mapId >= shuffleMapperAttempts.get(shuffleKey).length()) { + throw new ArrayIndexOutOfBoundsException(s"MapId $mapId of shuffleKey " + + s"$shuffleKey out of array index bounds when accessing shuffleMapperAttempts.") + } val endedAttemptId = shuffleMapperAttempts.get(shuffleKey).get(mapId) - val toWriteAttempt = attemptId == endedAttemptId + val toWriteAttempt = + if (endedAttemptId == -1) { + // Map task has not completed yet, always write + true + } else { + // Only write if the current attempt matches the ended attempt + attemptId == endedAttemptId + } (endedAttemptId, toWriteAttempt, mapId, attemptId) } else (-1, true, -1, -1) - if (endedAttempt == -1 || toWrite) { + if (toWrite) { fileWriter.write(body) } else { fileWriter.decrementPendingWrites() - logInfo(s"Shuffle $shuffleKey is committing, map $curMapId, ignore attemptId $curMapAttempt data and endedAttempt $endedAttempt") - } + logInfo( + s"Skipping data from map $curMapId attempt $curMapAttempt for shuffle $shuffleKey " + + s"because map already completed with attempt $endedAttempt") } result(index) = StatusCode.SUCCESS } catch { case e: Throwable =>