diff --git a/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java b/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java index 11f6fc6a14..43be684cb9 100644 --- a/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java +++ b/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java @@ -448,7 +448,7 @@ class RecordsFetcher extends Thread { RecordsFetcher(int partitionId) { this.partitionId = partitionId; this.sleepTime = initFetchSleepTime; - this.recordBuffer = new RecordBuffer(partitionId); + this.recordBuffer = new RecordBuffer<>(partitionId); this.nextQueue = combiner == null ? mergeBuffers.get(partitionId) : combineBuffers.get(partitionId); this.serverInfos = shuffleServerInfoMap.get(partitionId); @@ -512,7 +512,7 @@ public void run() { // split into two different threads, then will be asynchronous processes. Although it // seems to save time, it actually consumes more memory. reader = - new RecordsReader( + new RecordsReader<>( rssConf, SerInputStream.newInputStream(byteBuf), keyClass, @@ -526,7 +526,7 @@ public void run() { } if (recordBuffer.size() >= maxRecordsNumPerBuffer) { nextQueue.put(recordBuffer); - recordBuffer = new RecordBuffer(partitionId); + recordBuffer = new RecordBuffer<>(partitionId); } recordBuffer.addRecord(reader.getCurrentKey(), reader.getCurrentValue()); } @@ -561,12 +561,12 @@ class RecordsCombiner extends Thread { // The RecordBuffer has a capacity limit, records for the same key may be // distributed in different RecordBuffers. So we need a cachedBuffer used // to record the buffer of the last combine. - private RecordBuffer cached; + private RecordBuffer cached; private Queue nextQueue; RecordsCombiner(int partitionId) { this.partitionId = partitionId; - this.cached = new RecordBuffer(partitionId); + this.cached = new RecordBuffer<>(partitionId); this.nextQueue = mergeBuffers.get(partitionId); setName("RecordsCombiner-" + partitionId); } @@ -589,13 +589,13 @@ public void run() { // we can send the cached to downstream directly. if (cached.size() > 0 && !isSameKey(cached.getLastKey(), current.getFirstKey())) { sendCachedBuffer(cached); - cached = new RecordBuffer(partitionId); + cached = new RecordBuffer<>(partitionId); } // 3 combine the current, then cache it. By this way, we can handle the specical case // that next record // buffer has same key in current. - RecordBlob recordBlob = new RecordBlob(partitionId); + RecordBlob recordBlob = new RecordBlob<>(partitionId); recordBlob.addRecords(current); recordBlob.combine(combiner, isMapCombine); for (Object record : recordBlob.getResult()) { @@ -616,7 +616,7 @@ public void run() { private void sendCachedBuffer(RecordBuffer cachedBuffer) throws InterruptedException { // Multiple records with the same key may span different recordbuffers. we were only combined // within the same recordbuffer. So before send to downstream, we should combine the cached. - RecordBlob recordBlob = new RecordBlob(partitionId); + RecordBlob recordBlob = new RecordBlob(partitionId); recordBlob.addRecords(cachedBuffer); recordBlob.combine(combiner, true); RecordBuffer recordBuffer = new RecordBuffer<>(partitionId);