-
Notifications
You must be signed in to change notification settings - Fork 3.4k
POC to avoid usage of ReplicationResult #7528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: HBASE-28957
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
| */ | ||
| package org.apache.hadoop.hbase.replication; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.UUID; | ||
| import org.apache.hadoop.hbase.Cell; | ||
| import org.apache.hadoop.hbase.CellUtil; | ||
|
|
@@ -59,10 +60,11 @@ private void checkCell(Cell cell) { | |
| } | ||
|
|
||
| @Override | ||
| public ReplicationResult replicate(ReplicateContext replicateContext) { | ||
| public boolean replicate(ReplicateContext replicateContext) throws IOException { | ||
| replicateContext.entries.stream().map(WAL.Entry::getEdit).flatMap(e -> e.getCells().stream()) | ||
| .forEach(this::checkCell); | ||
| return ReplicationResult.COMMITTED; | ||
| getReplicationSource().cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why we need to call this here? |
||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,7 +48,6 @@ | |
| import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; | ||
| import org.apache.hadoop.hbase.regionserver.wal.WALUtil; | ||
| import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; | ||
| import org.apache.hadoop.hbase.replication.ReplicationResult; | ||
| import org.apache.hadoop.hbase.replication.ReplicationUtils; | ||
| import org.apache.hadoop.hbase.util.Bytes; | ||
| import org.apache.hadoop.hbase.util.CommonFSUtils; | ||
|
|
@@ -425,7 +424,7 @@ private long parallelReplicate(ReplicateContext replicateContext, List<List<Entr | |
| * Do the shipping logic | ||
| */ | ||
| @Override | ||
| public ReplicationResult replicate(ReplicateContext replicateContext) { | ||
| public boolean replicate(ReplicateContext replicateContext) throws IOException { | ||
| int sleepMultiplier = 1; | ||
| int initialTimeout = replicateContext.getTimeout(); | ||
|
|
||
|
|
@@ -445,7 +444,7 @@ public ReplicationResult replicate(ReplicateContext replicateContext) { | |
| lastSinkFetchTime = EnvironmentEdgeManager.currentTime(); | ||
| } | ||
| sleepForRetries("No sinks available at peer", sleepMultiplier); | ||
| return ReplicationResult.FAILED; | ||
| return false; | ||
| } | ||
|
|
||
| List<List<Entry>> batches = createBatches(replicateContext.getEntries()); | ||
|
|
@@ -459,7 +458,8 @@ public ReplicationResult replicate(ReplicateContext replicateContext) { | |
| try { | ||
| // replicate the batches to sink side. | ||
| parallelReplicate(replicateContext, batches); | ||
| return ReplicationResult.COMMITTED; | ||
| getReplicationSource().cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK I think I get why you call this method here, since here we can make sure that the wal entries have been persistent, it is OK for us to persist the offset. But for me, I prefer we follow the old way where call this in ReplicationSourceShipper.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think it's doable. @ankitsol ? |
||
| return true; | ||
| } catch (IOException ioe) { | ||
| if (ioe instanceof RemoteException) { | ||
| if (dropOnDeletedTables && isTableNotFoundException(ioe)) { | ||
|
|
@@ -468,14 +468,18 @@ public ReplicationResult replicate(ReplicateContext replicateContext) { | |
| batches = filterNotExistTableEdits(batches); | ||
| if (batches.isEmpty()) { | ||
| LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return"); | ||
| return ReplicationResult.COMMITTED; | ||
| getReplicationSource() | ||
| .cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries()); | ||
| return true; | ||
| } | ||
| } else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) { | ||
| batches = filterNotExistColumnFamilyEdits(batches); | ||
| if (batches.isEmpty()) { | ||
| LOG.warn("After filter not exist column family's edits, 0 edits to replicate, " | ||
| + "just return"); | ||
| return ReplicationResult.COMMITTED; | ||
| getReplicationSource() | ||
| .cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries()); | ||
| return true; | ||
| } | ||
| } else { | ||
| LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(), | ||
|
|
@@ -507,7 +511,7 @@ public ReplicationResult replicate(ReplicateContext replicateContext) { | |
| } | ||
| } | ||
| } | ||
| return ReplicationResult.FAILED; // in case we exited before replicating | ||
| return false; // in case we exited before replicating | ||
| } | ||
|
|
||
| protected boolean isPeerEnabled() { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is that, we should add a method may be called
beforePersistingReplicationOffset, and call it before we call updateLogPosition in ReplicationSourceShipper method. For old implementation, we just do nothing as we can make sure that every thing is persistent, and for S3 based endpoint, we close the writer to persist the data on S3.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You cannot close the writer every time when something was shipped, because closing and re-opening the same stream is a costly operation if even supported. We have to wait for enough data to be shipped (file size limit) or the configured time spent (time limit) before closing the current stream and opening a new one. This is controlled by the replication endpoint itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you also need to change the logic in ReplicationSourceShipper, to not always record the offset after shipping. And I do not think this can 'ONLY' be controlled by replication endpoint, in ReplicationSourceShipper you know the size of the WALEntries, and you also know how much time has elapsed after the last recording, so it is easy to implement the logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an interesting idea. @vinayakphegde @ankitsol wdyt?
@Apache9 Let's say the
ReplicationSourceShippercontrols when to record the offset. How would it know which kind of replication endpoint is it working with? Need to record the offset after every shipment or use time/size limit? Shall we make it a new attribute of the endpoints?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer we control it by time/size limit.
Even if the endpoint can persist the data after every shipment, we do not need to record the offset every time right? We just need to make sure that once the
ReplicationSourceShipperwant to record the offset, all the data before this offset has been persistent. So we can introduce abeforePersistingReplicationOffsetmethod for replication endpoint, if you persist the data after every shipment, you just need to do nothing. If it is S3 based endpoint, we close the output file to persist the data.In this way, the ReplicationSourceShipper does not need to know whether the endpoint can persist the data or not after every shipment. And in the future, for HBaseInterClusterReplicationEndpoint, we could also introduce some asynchronous mechanism to increase performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we talking about modifying the behaviour of existing replication endpoints?
Currently both data and offsets are persisted at every shipment. Would you like to change this to be controlled by time and size limits generally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Apache9 This seems like a good approach. We also want it to be both time and size based.
I have two questions regarding time based approach
ContinuousBackupReplicationEndpointwe implemented it as a separate threadReplicationSourceShipperorReplicationSource, considering 'ReplicationSourceShipper' is itself a threadCC @anmolnar @vinayakphegde
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think change the default behavior to be size/time based is OK, and we can make size = 0 as no size limit, size = -1 means persisting after every flush, in this way we can make the default size limit as -1 to keep the old behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can put the logic in ReplicationSourceShipper? There is a while loop in the thread, after every shipment, we calculate the size and time, and determine whether we should persist the offset.