From 50d4c2a18f1032a33a938e976db07de0c28a893f Mon Sep 17 00:00:00 2001 From: cchung100m Date: Mon, 3 Feb 2025 21:19:58 +0800 Subject: [PATCH 1/4] [#2353][Improvement] Fix the warning: unchecked method invocation: method sendCachedBuffer in class RMRecordsReader.RecordsCombiner is applied to given types --- .../client/record/reader/RMRecordsReader.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java b/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java index 11f6fc6a14..5200020310 100644 --- a/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java +++ b/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java @@ -262,6 +262,7 @@ public boolean next() throws IOException { curr = results.take(); return curr != null; } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException(e); } } @@ -289,6 +290,7 @@ public boolean next() throws IOException { curr = results.take(); return curr != null; } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException(e); } } @@ -334,6 +336,7 @@ public boolean next() throws IOException { return true; } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException(e); } } @@ -385,6 +388,7 @@ public C next() { curr = results.take(); return ret; } catch (InterruptedException | IOException e) { + Thread.currentThread().interrupt(); throw new RssException(e); } } @@ -512,7 +516,7 @@ public void run() { // split into two different threads, then will be asynchronous processes. Although it // seems to save time, it actually consumes more memory. reader = - new RecordsReader( + new RecordsReader<>( rssConf, SerInputStream.newInputStream(byteBuf), keyClass, @@ -595,7 +599,7 @@ public void run() { // 3 combine the current, then cache it. By this way, we can handle the specical case // that next record // buffer has same key in current. - RecordBlob recordBlob = new RecordBlob(partitionId); + RecordBlob recordBlob = new RecordBlob<>(partitionId); recordBlob.addRecords(current); recordBlob.combine(combiner, isMapCombine); for (Object record : recordBlob.getResult()) { @@ -608,6 +612,7 @@ public void run() { } } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RssException(e); } } @@ -616,7 +621,7 @@ public void run() { private void sendCachedBuffer(RecordBuffer cachedBuffer) throws InterruptedException { // Multiple records with the same key may span different recordbuffers. we were only combined // within the same recordbuffer. So before send to downstream, we should combine the cached. - RecordBlob recordBlob = new RecordBlob(partitionId); + RecordBlob recordBlob = new RecordBlob(partitionId); recordBlob.addRecords(cachedBuffer); recordBlob.combine(combiner, true); RecordBuffer recordBuffer = new RecordBuffer<>(partitionId); @@ -656,6 +661,7 @@ public void run() { } return new BufferedSegment(recordBuffer); } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); throw new RssException(ex); } }); @@ -671,6 +677,7 @@ public void run() { } catch (InterruptedException | IOException e) { error = e; stop = true; + Thread.currentThread().interrupt(); } } } From e65b91317ca6bc729741c053db9c955145e46c56 Mon Sep 17 00:00:00 2001 From: cchung100m Date: Sat, 8 Feb 2025 17:17:47 +0800 Subject: [PATCH 2/4] [#2353][Improvement] Fix the warning: unchecked method invocation: method sendCachedBuffer in class RMRecordsReader.RecordsCombiner is applied to given types --- .../uniffle/client/record/reader/RMRecordsReader.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java b/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java index 5200020310..0063a7fb22 100644 --- a/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java +++ b/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java @@ -452,7 +452,7 @@ class RecordsFetcher extends Thread { RecordsFetcher(int partitionId) { this.partitionId = partitionId; this.sleepTime = initFetchSleepTime; - this.recordBuffer = new RecordBuffer(partitionId); + this.recordBuffer = new RecordBuffer<>(partitionId); this.nextQueue = combiner == null ? mergeBuffers.get(partitionId) : combineBuffers.get(partitionId); this.serverInfos = shuffleServerInfoMap.get(partitionId); @@ -530,7 +530,7 @@ public void run() { } if (recordBuffer.size() >= maxRecordsNumPerBuffer) { nextQueue.put(recordBuffer); - recordBuffer = new RecordBuffer(partitionId); + recordBuffer = new RecordBuffer<>(partitionId); } recordBuffer.addRecord(reader.getCurrentKey(), reader.getCurrentValue()); } @@ -570,7 +570,7 @@ class RecordsCombiner extends Thread { RecordsCombiner(int partitionId) { this.partitionId = partitionId; - this.cached = new RecordBuffer(partitionId); + this.cached = new RecordBuffer<>(partitionId); this.nextQueue = mergeBuffers.get(partitionId); setName("RecordsCombiner-" + partitionId); } @@ -593,7 +593,7 @@ public void run() { // we can send the cached to downstream directly. if (cached.size() > 0 && !isSameKey(cached.getLastKey(), current.getFirstKey())) { sendCachedBuffer(cached); - cached = new RecordBuffer(partitionId); + cached = new RecordBuffer<>(partitionId); } // 3 combine the current, then cache it. By this way, we can handle the specical case From 3e6218238b64d4409358f2c324d80d015088ad8f Mon Sep 17 00:00:00 2001 From: cchung100m Date: Sun, 9 Feb 2025 14:39:57 +0800 Subject: [PATCH 3/4] [#2353][Improvement] Remove Thread.currentThread().interrupt(); --- .../uniffle/client/record/reader/RMRecordsReader.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java b/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java index 0063a7fb22..aeec16af06 100644 --- a/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java +++ b/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java @@ -262,7 +262,6 @@ public boolean next() throws IOException { curr = results.take(); return curr != null; } catch (InterruptedException e) { - Thread.currentThread().interrupt(); throw new IOException(e); } } @@ -290,7 +289,6 @@ public boolean next() throws IOException { curr = results.take(); return curr != null; } catch (InterruptedException e) { - Thread.currentThread().interrupt(); throw new IOException(e); } } @@ -336,7 +334,6 @@ public boolean next() throws IOException { return true; } } catch (InterruptedException e) { - Thread.currentThread().interrupt(); throw new IOException(e); } } @@ -388,7 +385,6 @@ public C next() { curr = results.take(); return ret; } catch (InterruptedException | IOException e) { - Thread.currentThread().interrupt(); throw new RssException(e); } } @@ -612,7 +608,6 @@ public void run() { } } } catch (InterruptedException e) { - Thread.currentThread().interrupt(); throw new RssException(e); } } @@ -661,7 +656,6 @@ public void run() { } return new BufferedSegment(recordBuffer); } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); throw new RssException(ex); } }); @@ -677,7 +671,6 @@ public void run() { } catch (InterruptedException | IOException e) { error = e; stop = true; - Thread.currentThread().interrupt(); } } } From bc10eb57df3c267bbaa60d578bf527ea19002b7e Mon Sep 17 00:00:00 2001 From: cchung100m Date: Thu, 13 Feb 2025 23:06:32 +0800 Subject: [PATCH 4/4] [#2353][Improvement] Fix the warning: unchecked method invocation: method sendCachedBuffer in class RMRecordsReader.RecordsCombiner is applied to given types --- .../apache/uniffle/client/record/reader/RMRecordsReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java b/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java index aeec16af06..43be684cb9 100644 --- a/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java +++ b/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java @@ -561,7 +561,7 @@ class RecordsCombiner extends Thread { // The RecordBuffer has a capacity limit, records for the same key may be // distributed in different RecordBuffers. So we need a cachedBuffer used // to record the buffer of the last combine. - private RecordBuffer cached; + private RecordBuffer cached; private Queue nextQueue; RecordsCombiner(int partitionId) {