Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
Expand Down Expand Up @@ -83,7 +81,6 @@ public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint
private final Map<Long, FSHLogProvider.Writer> walWriters = new ConcurrentHashMap<>();
private final ReentrantLock lock = new ReentrantLock();

private ReplicationSourceInterface replicationSource;
private Configuration conf;
private BackupFileSystemManager backupFileSystemManager;
private UUID peerUUID;
Expand All @@ -98,7 +95,6 @@ public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint
@Override
public void init(Context context) throws IOException {
super.init(context);
this.replicationSource = context.getReplicationSource();
this.peerId = context.getPeerId();
this.conf = HBaseConfiguration.create(context.getConfiguration());

Expand Down Expand Up @@ -155,7 +151,7 @@ private void flushAndBackupSafely() {
try {
LOG.info("{} Periodic WAL flush triggered", Utils.logPeerId(peerId));
flushWriters();
replicationSource.persistOffsets();
getReplicationSource().persistOffsets();
LOG.info("{} Periodic WAL flush and offset persistence completed successfully",
Utils.logPeerId(peerId));
} catch (IOException e) {
Expand Down Expand Up @@ -220,11 +216,11 @@ public EmptyEntriesPolicy getEmptyEntriesPolicy() {
}

@Override
public ReplicationResult replicate(ReplicateContext replicateContext) {
public boolean replicate(ReplicateContext replicateContext) {
final List<WAL.Entry> entries = replicateContext.getEntries();
if (entries.isEmpty()) {
LOG.debug("{} No WAL entries to replicate", Utils.logPeerId(peerId));
return ReplicationResult.SUBMITTED;
return true;
}

LOG.debug("{} Received {} WAL entries for replication", Utils.logPeerId(peerId),
Expand Down Expand Up @@ -253,15 +249,16 @@ public ReplicationResult replicate(ReplicateContext replicateContext) {
Utils.logPeerId(peerId));
flushWriters();
LOG.debug("{} Replication committed after WAL flush", Utils.logPeerId(peerId));
return ReplicationResult.COMMITTED;
getReplicationSource().cleanupHFileRefsAndPersistOffsets(entries);
return true;
}

LOG.debug("{} Replication submitted successfully", Utils.logPeerId(peerId));
return ReplicationResult.SUBMITTED;
return true;
} catch (IOException e) {
LOG.error("{} Replication failed. Error details: {}", Utils.logPeerId(peerId), e.getMessage(),
e);
return ReplicationResult.FAILED;
return false;
} finally {
lock.unlock();
}
Expand All @@ -277,8 +274,8 @@ public ReplicationResult replicate(ReplicateContext replicateContext) {
private void updateLastReplicatedTimestampForContinuousBackup() throws IOException {
try (final Connection conn = ConnectionFactory.createConnection(conf);
BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) {
backupSystemTable.updateBackupCheckpointTimestamp(replicationSource.getServerWALsBelongTo(),
latestWALEntryTimestamp);
backupSystemTable.updateBackupCheckpointTimestamp(
getReplicationSource().getServerWALsBelongTo(), latestWALEntryTimestamp);
}
}

Expand Down Expand Up @@ -379,7 +376,7 @@ private void close() {
lock.lock();
try {
flushWriters();
replicationSource.persistOffsets();
getReplicationSource().persistOffsets();
} catch (IOException e) {
LOG.error("{} Failed to Flush Open Wal Writers: {}", Utils.logPeerId(peerId), e.getMessage(),
e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,13 +41,15 @@ public abstract class BaseReplicationEndpoint extends AbstractService
public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY =
"hbase.replication.source.custom.walentryfilters";
protected Context ctx;
private ReplicationSourceInterface replicationSource;

@Override
public void init(Context context) throws IOException {
this.ctx = context;

if (this.ctx != null) {
ReplicationPeer peer = this.ctx.getReplicationPeer();
this.replicationSource = context.getReplicationSource();
if (peer != null) {
peer.registerPeerConfigListener(this);
} else {
Expand Down Expand Up @@ -120,4 +123,8 @@ public boolean canReplicateToSameCluster() {
public boolean isStarting() {
return state() == State.STARTING;
}

public ReplicationSourceInterface getReplicationSource() {
return replicationSource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public int getTimeout() {
* the context are assumed to be persisted in the target cluster.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that, we should add a method may be called beforePersistingReplicationOffset, and call it before we call updateLogPosition in ReplicationSourceShipper method. For old implementation, we just do nothing as we can make sure that every thing is persistent, and for S3 based endpoint, we close the writer to persist the data on S3.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot close the writer every time when something was shipped, because closing and re-opening the same stream is a costly operation if even supported. We have to wait for enough data to be shipped (file size limit) or the configured time spent (time limit) before closing the current stream and opening a new one. This is controlled by the replication endpoint itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you also need to change the logic in ReplicationSourceShipper, to not always record the offset after shipping. And I do not think this can 'ONLY' be controlled by replication endpoint, in ReplicationSourceShipper you know the size of the WALEntries, and you also know how much time has elapsed after the last recording, so it is easy to implement the logic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting idea. @vinayakphegde @ankitsol wdyt?

@Apache9 Let's say the ReplicationSourceShipper controls when to record the offset. How would it know which kind of replication endpoint is it working with? Need to record the offset after every shipment or use time/size limit? Shall we make it a new attribute of the endpoints?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer we control it by time/size limit.

Even if the endpoint can persist the data after every shipment, we do not need to record the offset every time right? We just need to make sure that once the ReplicationSourceShipper want to record the offset, all the data before this offset has been persistent. So we can introduce a beforePersistingReplicationOffset method for replication endpoint, if you persist the data after every shipment, you just need to do nothing. If it is S3 based endpoint, we close the output file to persist the data.

In this way, the ReplicationSourceShipper does not need to know whether the endpoint can persist the data or not after every shipment. And in the future, for HBaseInterClusterReplicationEndpoint, we could also introduce some asynchronous mechanism to increase performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if the endpoint can persist the data after every shipment, we do not need to record the offset every time right?

Are we talking about modifying the behaviour of existing replication endpoints?
Currently both data and offsets are persisted at every shipment. Would you like to change this to be controlled by time and size limits generally?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Apache9 This seems like a good approach. We also want it to be both time and size based.

I have two questions regarding time based approach

  1. Should this time based count run on a separate thread. Currently in ContinuousBackupReplicationEndpoint we implemented it as a separate thread
  2. Where should be save time/size based context? ReplicationSourceShipper or ReplicationSource, considering 'ReplicationSourceShipper' is itself a thread

CC @anmolnar @vinayakphegde

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think change the default behavior to be size/time based is OK, and we can make size = 0 as no size limit, size = -1 means persisting after every flush, in this way we can make the default size limit as -1 to keep the old behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Apache9 This seems like a good approach. We also want it to be both time and size based.

I have two questions regarding time based approach

  1. Should this time based count run on a separate thread. Currently in ContinuousBackupReplicationEndpoint we implemented it as a separate thread
  2. Where should be save time/size based context? ReplicationSourceShipper or ReplicationSource, considering 'ReplicationSourceShipper' is itself a thread

CC @anmolnar @vinayakphegde

I think we can put the logic in ReplicationSourceShipper? There is a while loop in the thread, after every shipment, we calculate the size and time, and determine whether we should persist the offset.

* @param replicateContext a context where WAL entries and other parameters can be obtained.
*/
ReplicationResult replicate(ReplicateContext replicateContext);
boolean replicate(ReplicateContext replicateContext) throws IOException;

// The below methods are inspired by Guava Service. See
// https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.UUID;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
Expand Down Expand Up @@ -59,10 +60,11 @@ private void checkCell(Cell cell) {
}

@Override
public ReplicationResult replicate(ReplicateContext replicateContext) {
public boolean replicate(ReplicateContext replicateContext) throws IOException {
replicateContext.entries.stream().map(WAL.Entry::getEdit).flatMap(e -> e.getCells().stream())
.forEach(this::checkCell);
return ReplicationResult.COMMITTED;
getReplicationSource().cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to call this here?

return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand Down Expand Up @@ -425,7 +424,7 @@ private long parallelReplicate(ReplicateContext replicateContext, List<List<Entr
* Do the shipping logic
*/
@Override
public ReplicationResult replicate(ReplicateContext replicateContext) {
public boolean replicate(ReplicateContext replicateContext) throws IOException {
int sleepMultiplier = 1;
int initialTimeout = replicateContext.getTimeout();

Expand All @@ -445,7 +444,7 @@ public ReplicationResult replicate(ReplicateContext replicateContext) {
lastSinkFetchTime = EnvironmentEdgeManager.currentTime();
}
sleepForRetries("No sinks available at peer", sleepMultiplier);
return ReplicationResult.FAILED;
return false;
}

List<List<Entry>> batches = createBatches(replicateContext.getEntries());
Expand All @@ -459,7 +458,8 @@ public ReplicationResult replicate(ReplicateContext replicateContext) {
try {
// replicate the batches to sink side.
parallelReplicate(replicateContext, batches);
return ReplicationResult.COMMITTED;
getReplicationSource().cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I think I get why you call this method here, since here we can make sure that the wal entries have been persistent, it is OK for us to persist the offset. But for me, I prefer we follow the old way where call this in ReplicationSourceShipper.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But for me, I prefer we follow the old way where call this in ReplicationSourceShipper.

I think it's doable. @ankitsol ?

return true;
} catch (IOException ioe) {
if (ioe instanceof RemoteException) {
if (dropOnDeletedTables && isTableNotFoundException(ioe)) {
Expand All @@ -468,14 +468,18 @@ public ReplicationResult replicate(ReplicateContext replicateContext) {
batches = filterNotExistTableEdits(batches);
if (batches.isEmpty()) {
LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return");
return ReplicationResult.COMMITTED;
getReplicationSource()
.cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries());
return true;
}
} else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) {
batches = filterNotExistColumnFamilyEdits(batches);
if (batches.isEmpty()) {
LOG.warn("After filter not exist column family's edits, 0 edits to replicate, "
+ "just return");
return ReplicationResult.COMMITTED;
getReplicationSource()
.cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries());
return true;
}
} else {
LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(),
Expand Down Expand Up @@ -507,7 +511,7 @@ public ReplicationResult replicate(ReplicateContext replicateContext) {
}
}
}
return ReplicationResult.FAILED; // in case we exited before replicating
return false; // in case we exited before replicating
}

protected boolean isPeerEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
Expand All @@ -58,7 +60,6 @@
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
Expand All @@ -67,13 +68,16 @@
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;

/**
* Class that handles the source of a replication stream. Currently does not handle more than 1
* slave cluster. For each slave cluster it selects a random number of peers using a replication
Expand Down Expand Up @@ -867,16 +871,12 @@ public long getTotalReplicatedEdits() {
}

@Override
public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch, ReplicationResult replicated) {
public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
String walName = entryBatch.getLastWalPath().getName();
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(walName);

synchronized (lastEntryBatch) { // Synchronize addition and processing
lastEntryBatch.put(walPrefix, entryBatch);

if (replicated == ReplicationResult.COMMITTED) {
processAndClearEntries();
}
}
}

Expand All @@ -893,4 +893,38 @@ private void processAndClearEntries() {
// Clear all processed entries
lastEntryBatch.clear();
}

@Override
public void cleanupHFileRefsAndPersistOffsets(List<Entry> entries) throws IOException {
// Clean up hfile references
for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit());
LOG.trace("shipped entry {}: ", entry);
}
persistOffsets();
}

private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = getPeerId();
if (peerId.contains("-")) {
// peerClusterZnode will be in the form peerId + "-" + rsZNode.
// A peerId will not have "-" in its name, see HBASE-11394
peerId = peerId.split("-")[0];
}
List<Cell> cells = edit.getCells();
int totalCells = cells.size();
for (int i = 0; i < totalCells; i++) {
Cell cell = cells.get(i);
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
List<WALProtos.StoreDescriptor> stores = bld.getStoresList();
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
List<String> storeFileList = stores.get(j).getStoreFileList();
getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -208,11 +207,15 @@ default boolean isRecovered() {
* @param entryBatch the wal entry batch we just shipped
* @return The instance of queueStorage used by this ReplicationSource.
*/
default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch, ReplicationResult replicated) {
default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {

}

default public void persistOffsets() {

}

default public void cleanupHFileRefsAndPersistOffsets(List<Entry> entries) throws IOException {

}
}
Loading