From 2e459f46ce2018d24b32eace0b41caf7857ba974 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Tue, 11 Feb 2025 11:44:08 +0800 Subject: [PATCH 1/2] [#2362] feat(client): Followup to log more compression infos --- .../spark/shuffle/writer/WriteBufferManager.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java index 7e4e893e31..2213acd4f4 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java @@ -92,6 +92,7 @@ public class WriteBufferManager extends MemoryConsumer { private SerializationStream serializeStream; private WrappedByteArrayOutputStream arrayOutputStream; private long uncompressedDataLen = 0; + private long compressedDataLen = 0; private long requireMemoryInterval; private int requireMemoryRetryMax; private Optional codec; @@ -430,6 +431,7 @@ protected ShuffleBlockInfo createShuffleBlock(int partitionId, WriterBuffer wb) blockIdLayout.getBlockId(getNextSeqNo(partitionId), partitionId, taskAttemptId); blockCounter.incrementAndGet(); uncompressedDataLen += data.length; + compressedDataLen += compressed.length; shuffleWriteMetrics.incBytesWritten(compressed.length); // add memory to indicate bytes which will be sent to shuffle server inSendListBytes.addAndGet(wb.getMemoryUsed()); @@ -648,14 +650,18 @@ public String getManagerCostInfo() { + serializeTime + "], sortTime[" + sortTime - + "], compressTime[" - + compressTime + "], estimateTime[" + estimateTime + "], requireMemoryTime[" + requireMemoryTime + "], uncompressedDataLen[" + uncompressedDataLen + + "], compressedDataLen[" + + compressedDataLen + + "], compressTime[" + + compressTime + + "], compressRatio[" + + (float) uncompressedDataLen / compressedDataLen + "]"; } From c150f142ff6003d8d75f7b5baa1bdb292a63dfdb Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Tue, 11 Feb 2025 13:30:22 +0800 Subject: [PATCH 2/2] fix --- .../org/apache/spark/shuffle/writer/WriteBufferManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java index 2213acd4f4..cf4b4bc51c 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java @@ -661,7 +661,7 @@ public String getManagerCostInfo() { + "], compressTime[" + compressTime + "], compressRatio[" - + (float) uncompressedDataLen / compressedDataLen + + (compressedDataLen == 0 ? 0 : (float) uncompressedDataLen / compressedDataLen) + "]"; }