Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ class RecordsFetcher extends Thread {
RecordsFetcher(int partitionId) {
this.partitionId = partitionId;
this.sleepTime = initFetchSleepTime;
this.recordBuffer = new RecordBuffer(partitionId);
this.recordBuffer = new RecordBuffer<>(partitionId);
this.nextQueue =
combiner == null ? mergeBuffers.get(partitionId) : combineBuffers.get(partitionId);
this.serverInfos = shuffleServerInfoMap.get(partitionId);
Expand Down Expand Up @@ -512,7 +512,7 @@ public void run() {
// split into two different threads, then will be asynchronous processes. Although it
// seems to save time, it actually consumes more memory.
reader =
new RecordsReader(
new RecordsReader<>(
rssConf,
SerInputStream.newInputStream(byteBuf),
keyClass,
Expand All @@ -526,7 +526,7 @@ public void run() {
}
if (recordBuffer.size() >= maxRecordsNumPerBuffer) {
nextQueue.put(recordBuffer);
recordBuffer = new RecordBuffer(partitionId);
recordBuffer = new RecordBuffer<>(partitionId);
}
recordBuffer.addRecord(reader.getCurrentKey(), reader.getCurrentValue());
}
Expand Down Expand Up @@ -561,12 +561,12 @@ class RecordsCombiner extends Thread {
// The RecordBuffer has a capacity limit, records for the same key may be
// distributed in different RecordBuffers. So we need a cachedBuffer used
// to record the buffer of the last combine.
private RecordBuffer cached;
private RecordBuffer<K, C> cached;
private Queue<RecordBuffer> nextQueue;

RecordsCombiner(int partitionId) {
this.partitionId = partitionId;
this.cached = new RecordBuffer(partitionId);
this.cached = new RecordBuffer<>(partitionId);
this.nextQueue = mergeBuffers.get(partitionId);
setName("RecordsCombiner-" + partitionId);
}
Expand All @@ -589,13 +589,13 @@ public void run() {
// we can send the cached to downstream directly.
if (cached.size() > 0 && !isSameKey(cached.getLastKey(), current.getFirstKey())) {
sendCachedBuffer(cached);
cached = new RecordBuffer(partitionId);
cached = new RecordBuffer<>(partitionId);
}

// 3 combine the current, then cache it. By this way, we can handle the specical case
// that next record
// buffer has same key in current.
RecordBlob recordBlob = new RecordBlob(partitionId);
RecordBlob recordBlob = new RecordBlob<>(partitionId);
recordBlob.addRecords(current);
recordBlob.combine(combiner, isMapCombine);
for (Object record : recordBlob.getResult()) {
Expand All @@ -616,7 +616,7 @@ public void run() {
private void sendCachedBuffer(RecordBuffer<K, C> cachedBuffer) throws InterruptedException {
// Multiple records with the same key may span different recordbuffers. we were only combined
// within the same recordbuffer. So before send to downstream, we should combine the cached.
RecordBlob recordBlob = new RecordBlob(partitionId);
RecordBlob recordBlob = new RecordBlob<K, C, Object>(partitionId);
recordBlob.addRecords(cachedBuffer);
recordBlob.combine(combiner, true);
RecordBuffer recordBuffer = new RecordBuffer<>(partitionId);
Expand Down
Loading