Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
.setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
compactScannerSizeLimit)
.build();
.setKeepBlockProgress(false).build();
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ public class ScannerContext {
boolean keepProgress;
private static boolean DEFAULT_KEEP_PROGRESS = false;

/**
* For some chores (specifically Compaction) we use the block size limit to avoid too long next()
* calls. In those cases the entire region is handled with a single ScannerContext, and we do want
* to reset the block progress.
*/
boolean keepBlockProgress;
private static boolean DEFAULT_KEEP_BLOCK_PROGRESS = true;

/**
* Allows temporarily ignoring limits and skipping tracking of batch and size progress. Used when
* skipping to the next row, in which case all processed cells are thrown away so should not count
Expand All @@ -129,6 +137,11 @@ public class ScannerContext {

ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics,
ServerSideScanMetrics scanMetrics) {
this(keepProgress, DEFAULT_KEEP_BLOCK_PROGRESS, limitsToCopy, trackMetrics, scanMetrics);
}

ScannerContext(boolean keepProgress, boolean keepBlockProgress, LimitFields limitsToCopy,
boolean trackMetrics, ServerSideScanMetrics scanMetrics) {
this.limits = new LimitFields();
if (limitsToCopy != null) {
this.limits.copy(limitsToCopy);
Expand All @@ -138,6 +151,7 @@ public class ScannerContext {
progress = new ProgressFields(0, 0, 0, 0);

this.keepProgress = keepProgress;
this.keepBlockProgress = keepBlockProgress;
this.scannerState = DEFAULT_STATE;
this.metrics =
trackMetrics ? (scanMetrics != null ? scanMetrics : new ServerSideScanMetrics()) : null;
Expand Down Expand Up @@ -259,11 +273,15 @@ void setBatchProgress(int batchProgress) {
/**
* Clear away any progress that has been made so far. All progress fields are reset to initial
* values. Only clears progress that should reset between rows. {@link #getBlockSizeProgress()} is
* not reset because it increments for all blocks scanned whether the result is included or
* filtered.
* not reset by default because it increments for all blocks scanned whether the result is
* included or filtered.
*/
void clearProgress() {
progress.setFields(0, 0, 0, getBlockSizeProgress());
if (keepBlockProgress) {
progress.setFields(0, 0, 0, getBlockSizeProgress());
} else {
progress.setFields(0, 0, 0, 0);
}
}

/**
Expand Down Expand Up @@ -421,6 +439,7 @@ public static Builder newBuilder(boolean keepProgress) {

public static final class Builder {
boolean keepProgress = DEFAULT_KEEP_PROGRESS;
boolean keepBlockProgress = DEFAULT_KEEP_BLOCK_PROGRESS;
boolean trackMetrics = false;
LimitFields limits = new LimitFields();
ServerSideScanMetrics scanMetrics = null;
Expand All @@ -437,6 +456,11 @@ public Builder setKeepProgress(boolean keepProgress) {
return this;
}

public Builder setKeepBlockProgress(boolean keepblockProgress) {
this.keepBlockProgress = keepblockProgress;
return this;
}

public Builder setTrackMetrics(boolean trackMetrics) {
this.trackMetrics = trackMetrics;
return this;
Expand Down Expand Up @@ -468,7 +492,7 @@ public Builder setScanMetrics(ServerSideScanMetrics scanMetrics) {
}

public ScannerContext build() {
return new ScannerContext(keepProgress, limits, trackMetrics, scanMetrics);
return new ScannerContext(keepProgress, keepBlockProgress, limits, trackMetrics, scanMetrics);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
.setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
compactScannerSizeLimit)
.build();
.setKeepBlockProgress(false).build();

throughputController.start(compactionName);
Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null;
Expand Down