diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java index 19cd2733af7b..c315b687b136 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java @@ -47,8 +47,6 @@ import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy; -import org.apache.hadoop.hbase.replication.ReplicationResult; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.FSHLogProvider; @@ -83,7 +81,6 @@ public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint private final Map walWriters = new ConcurrentHashMap<>(); private final ReentrantLock lock = new ReentrantLock(); - private ReplicationSourceInterface replicationSource; private Configuration conf; private BackupFileSystemManager backupFileSystemManager; private UUID peerUUID; @@ -98,7 +95,6 @@ public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint @Override public void init(Context context) throws IOException { super.init(context); - this.replicationSource = context.getReplicationSource(); this.peerId = context.getPeerId(); this.conf = HBaseConfiguration.create(context.getConfiguration()); @@ -155,7 +151,7 @@ private void flushAndBackupSafely() { try { LOG.info("{} Periodic WAL flush triggered", Utils.logPeerId(peerId)); flushWriters(); - replicationSource.persistOffsets(); + getReplicationSource().persistOffsets(); LOG.info("{} Periodic WAL flush and offset persistence completed successfully", Utils.logPeerId(peerId)); } catch (IOException e) { @@ -220,11 +216,11 @@ public EmptyEntriesPolicy getEmptyEntriesPolicy() { } @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { + public boolean replicate(ReplicateContext replicateContext) { final List entries = replicateContext.getEntries(); if (entries.isEmpty()) { LOG.debug("{} No WAL entries to replicate", Utils.logPeerId(peerId)); - return ReplicationResult.SUBMITTED; + return true; } LOG.debug("{} Received {} WAL entries for replication", Utils.logPeerId(peerId), @@ -253,15 +249,16 @@ public ReplicationResult replicate(ReplicateContext replicateContext) { Utils.logPeerId(peerId)); flushWriters(); LOG.debug("{} Replication committed after WAL flush", Utils.logPeerId(peerId)); - return ReplicationResult.COMMITTED; + getReplicationSource().cleanupHFileRefsAndPersistOffsets(entries); + return true; } LOG.debug("{} Replication submitted successfully", Utils.logPeerId(peerId)); - return ReplicationResult.SUBMITTED; + return true; } catch (IOException e) { LOG.error("{} Replication failed. Error details: {}", Utils.logPeerId(peerId), e.getMessage(), e); - return ReplicationResult.FAILED; + return false; } finally { lock.unlock(); } @@ -277,8 +274,8 @@ public ReplicationResult replicate(ReplicateContext replicateContext) { private void updateLastReplicatedTimestampForContinuousBackup() throws IOException { try (final Connection conn = ConnectionFactory.createConnection(conf); BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) { - backupSystemTable.updateBackupCheckpointTimestamp(replicationSource.getServerWALsBelongTo(), - latestWALEntryTimestamp); + backupSystemTable.updateBackupCheckpointTimestamp( + getReplicationSource().getServerWALsBelongTo(), latestWALEntryTimestamp); } } @@ -379,7 +376,7 @@ private void close() { lock.lock(); try { flushWriters(); - replicationSource.persistOffsets(); + getReplicationSource().persistOffsets(); } catch (IOException e) { LOG.error("{} Failed to Flush Open Wal Writers: {}", Utils.logPeerId(peerId), e.getMessage(), e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index 1a39dd222a83..f2a727a9c83b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,7 @@ public abstract class BaseReplicationEndpoint extends AbstractService public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY = "hbase.replication.source.custom.walentryfilters"; protected Context ctx; + private ReplicationSourceInterface replicationSource; @Override public void init(Context context) throws IOException { @@ -47,6 +49,7 @@ public void init(Context context) throws IOException { if (this.ctx != null) { ReplicationPeer peer = this.ctx.getReplicationPeer(); + this.replicationSource = context.getReplicationSource(); if (peer != null) { peer.registerPeerConfigListener(this); } else { @@ -120,4 +123,8 @@ public boolean canReplicateToSameCluster() { public boolean isStarting() { return state() == State.STARTING; } + + public ReplicationSourceInterface getReplicationSource() { + return replicationSource; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index fbb6b6b9ef10..30a4ba92dbe1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -216,7 +216,7 @@ public int getTimeout() { * the context are assumed to be persisted in the target cluster. * @param replicateContext a context where WAL entries and other parameters can be obtained. */ - ReplicationResult replicate(ReplicateContext replicateContext); + boolean replicate(ReplicateContext replicateContext) throws IOException; // The below methods are inspired by Guava Service. See // https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java deleted file mode 100644 index 03ed0ce6799f..000000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.yetus.audience.InterfaceAudience; - -@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) -public enum ReplicationResult { - /* Batch has been replicated and persisted successfully. */ - COMMITTED, - - /* Batch has been submitted for replication, but not persisted yet. */ - SUBMITTED, - - /* Batch replicaton failed, should be re-tried */ - FAILED -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java index a9674407bd2a..99f0426e42c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.IOException; import java.util.UUID; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -59,10 +60,11 @@ private void checkCell(Cell cell) { } @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { + public boolean replicate(ReplicateContext replicateContext) throws IOException { replicateContext.entries.stream().map(WAL.Entry::getEdit).flatMap(e -> e.getCells().stream()) .forEach(this::checkCell); - return ReplicationResult.COMMITTED; + getReplicationSource().cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries()); + return true; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 4f9a4909d784..89fed9681312 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -425,7 +424,7 @@ private long parallelReplicate(ReplicateContext replicateContext, List> batches = createBatches(replicateContext.getEntries()); @@ -459,7 +458,8 @@ public ReplicationResult replicate(ReplicateContext replicateContext) { try { // replicate the batches to sink side. parallelReplicate(replicateContext, batches); - return ReplicationResult.COMMITTED; + getReplicationSource().cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries()); + return true; } catch (IOException ioe) { if (ioe instanceof RemoteException) { if (dropOnDeletedTables && isTableNotFoundException(ioe)) { @@ -468,14 +468,18 @@ public ReplicationResult replicate(ReplicateContext replicateContext) { batches = filterNotExistTableEdits(batches); if (batches.isEmpty()) { LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return"); - return ReplicationResult.COMMITTED; + getReplicationSource() + .cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries()); + return true; } } else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) { batches = filterNotExistColumnFamilyEdits(batches); if (batches.isEmpty()) { LOG.warn("After filter not exist column family's edits, 0 edits to replicate, " + "just return"); - return ReplicationResult.COMMITTED; + getReplicationSource() + .cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries()); + return true; } } else { LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(), @@ -507,7 +511,7 @@ public ReplicationResult replicate(ReplicateContext replicateContext) { } } } - return ReplicationResult.FAILED; // in case we exited before replicating + return false; // in case we exited before replicating } protected boolean isPeerEnabled() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 73421dced454..0a29c9fe3463 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -41,6 +41,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; @@ -58,7 +60,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueData; import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; @@ -67,6 +68,7 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +76,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + /** * Class that handles the source of a replication stream. Currently does not handle more than 1 * slave cluster. For each slave cluster it selects a random number of peers using a replication @@ -867,16 +871,12 @@ public long getTotalReplicatedEdits() { } @Override - public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch, ReplicationResult replicated) { + public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) { String walName = entryBatch.getLastWalPath().getName(); String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(walName); synchronized (lastEntryBatch) { // Synchronize addition and processing lastEntryBatch.put(walPrefix, entryBatch); - - if (replicated == ReplicationResult.COMMITTED) { - processAndClearEntries(); - } } } @@ -893,4 +893,38 @@ private void processAndClearEntries() { // Clear all processed entries lastEntryBatch.clear(); } + + @Override + public void cleanupHFileRefsAndPersistOffsets(List entries) throws IOException { + // Clean up hfile references + for (Entry entry : entries) { + cleanUpHFileRefs(entry.getEdit()); + LOG.trace("shipped entry {}: ", entry); + } + persistOffsets(); + } + + private void cleanUpHFileRefs(WALEdit edit) throws IOException { + String peerId = getPeerId(); + if (peerId.contains("-")) { + // peerClusterZnode will be in the form peerId + "-" + rsZNode. + // A peerId will not have "-" in its name, see HBASE-11394 + peerId = peerId.split("-")[0]; + } + List cells = edit.getCells(); + int totalCells = cells.size(); + for (int i = 0; i < totalCells; i++) { + Cell cell = cells.get(i); + if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { + WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + List stores = bld.getStoresList(); + int totalStores = stores.size(); + for (int j = 0; j < totalStores; j++) { + List storeFileList = stores.get(j).getStoreFileList(); + getSourceManager().cleanUpHFileRefs(peerId, storeFileList); + getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size()); + } + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index f482cc73e717..cd3a3e07ba99 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueData; import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; @@ -208,11 +207,15 @@ default boolean isRecovered() { * @param entryBatch the wal entry batch we just shipped * @return The instance of queueStorage used by this ReplicationSource. */ - default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch, ReplicationResult replicated) { + default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) { } default public void persistOffsets() { } + + default public void cleanupHFileRefsAndPersistOffsets(List entries) throws IOException { + + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index f45c8762683a..0bd71237aa6d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -21,27 +21,18 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; import com.google.errorprone.annotations.RestrictedApi; -import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; - /** * This thread reads entries from a queue and ships them. Entries are placed onto the queue by * ReplicationSourceWALReaderThread @@ -169,8 +160,7 @@ void shipEdits(WALEntryBatch entryBatch) { * However, some endpoints (e.g., asynchronous S3 backups) may buffer writes and delay actual * persistence. In such cases, we must avoid committing the WAL position prematurely. */ - final ReplicationResult result = getReplicationResult(); - updateLogPosition(entryBatch, result); + updateLogPosition(entryBatch); return; } int currentSize = (int) entryBatch.getHeapSize(); @@ -197,23 +187,17 @@ void shipEdits(WALEntryBatch entryBatch) { long startTimeNs = System.nanoTime(); // send the edits to the endpoint. Will block until the edits are shipped and acknowledged - ReplicationResult replicated = source.getReplicationEndpoint().replicate(replicateContext); + boolean replicated = source.getReplicationEndpoint().replicate(replicateContext); long endTimeNs = System.nanoTime(); - if (replicated == ReplicationResult.FAILED) { + if (replicated == false) { continue; } else { sleepMultiplier = Math.max(sleepMultiplier - 1, 0); } - if (replicated == ReplicationResult.COMMITTED) { - // Clean up hfile references - for (Entry entry : entries) { - cleanUpHFileRefs(entry.getEdit()); - LOG.trace("shipped entry {}: ", entry); - } - } + // Log and clean up WAL logs - updateLogPosition(entryBatch, replicated); + updateLogPosition(entryBatch); // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) // this sizeExcludeBulkLoad has to use same calculation that when calling @@ -246,42 +230,11 @@ void shipEdits(WALEntryBatch entryBatch) { } } - private ReplicationResult getReplicationResult() { - EmptyEntriesPolicy policy = source.getReplicationEndpoint().getEmptyEntriesPolicy(); - return (policy == EmptyEntriesPolicy.COMMIT) - ? ReplicationResult.COMMITTED - : ReplicationResult.SUBMITTED; - } - - private void cleanUpHFileRefs(WALEdit edit) throws IOException { - String peerId = source.getPeerId(); - if (peerId.contains("-")) { - // peerClusterZnode will be in the form peerId + "-" + rsZNode. - // A peerId will not have "-" in its name, see HBASE-11394 - peerId = peerId.split("-")[0]; - } - List cells = edit.getCells(); - int totalCells = cells.size(); - for (int i = 0; i < totalCells; i++) { - Cell cell = cells.get(i); - if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { - BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); - List stores = bld.getStoresList(); - int totalStores = stores.size(); - for (int j = 0; j < totalStores; j++) { - List storeFileList = stores.get(j).getStoreFileList(); - source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList); - source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size()); - } - } - } - } - @RestrictedApi( explanation = "Package-private for test visibility only. Do not use outside tests.", link = "", allowedOnPath = "(.*/src/test/.*|.*/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java)") - boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) { + boolean updateLogPosition(WALEntryBatch batch) { boolean updated = false; // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file // record on zk, so let's call it. The last wal position maybe zero if end of file is true and @@ -291,7 +244,7 @@ boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) { batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) || batch.getLastWalPosition() != currentPosition ) { - source.logPositionAndCleanOldLogs(batch, replicated); + source.logPositionAndCleanOldLogs(batch); updated = true; } // if end of file is true, then we can just skip to the next file in queue. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java index a32ce78b0c78..f84f03e491e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; @@ -64,7 +63,7 @@ public void peerConfigUpdated(ReplicationPeerConfig rpc) { } @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { + public boolean replicate(ReplicateContext replicateContext) throws IOException { if (!delegator.canReplicateToSameCluster()) { // Only when the replication is inter cluster replication we need to // convert the visibility tags to diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java index f0e627316cd4..4fc4249d7c54 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.IOException; import java.util.UUID; import org.apache.yetus.audience.InterfaceAudience; @@ -42,8 +43,9 @@ public WALEntryFilter getWALEntryfilter() { } @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { - return ReplicationResult.COMMITTED; + public boolean replicate(ReplicateContext replicateContext) throws IOException { + getReplicationSource().cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries()); + return true; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java index a8c76033d02d..f54c39316997 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -81,7 +81,7 @@ public UUID getPeerUUID() { } @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { + public boolean replicate(ReplicateContext replicateContext) { synchronized (WRITER) { try { for (Entry entry : replicateContext.getEntries()) { @@ -92,7 +92,7 @@ public ReplicationResult replicate(ReplicateContext replicateContext) { throw new UncheckedIOException(e); } } - return ReplicationResult.COMMITTED; + return true; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java index d9a75b8ca8a3..67254f893342 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java @@ -199,8 +199,8 @@ protected Collection fetchPeerAddresses() { } @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { - return ReplicationResult.FAILED; + public boolean replicate(ReplicateContext replicateContext) { + return false; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java index c98b46c8e4be..70cae18b4561 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java @@ -127,9 +127,9 @@ public WALEntryFilter getWALEntryfilter() { } @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { + public boolean replicate(ReplicateContext replicateContext) { REPLICATED.set(true); - return ReplicationResult.COMMITTED; + return true; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index a8bea44750fb..0ca774ab8248 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -430,7 +430,7 @@ public ReplicationEndpointTest() { } @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { + public boolean replicate(ReplicateContext replicateContext) throws IOException { replicateCount.incrementAndGet(); replicatedEntries.addAll(replicateContext.getEntries()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 77cd5da8de0a..241d56662d6e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -463,10 +463,10 @@ public UUID getPeerUUID() { } @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { + public boolean replicate(ReplicateContext replicateContext) { replicateCount.incrementAndGet(); lastEntries = new ArrayList<>(replicateContext.entries); - return ReplicationResult.COMMITTED; + return true; } @Override @@ -526,12 +526,12 @@ public void init(Context context) throws IOException { } @Override - public ReplicationResult replicate(ReplicateContext context) { + public boolean replicate(ReplicateContext context) { try { Thread.sleep(duration); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - return ReplicationResult.FAILED; + return false; } return super.replicate(context); } @@ -548,9 +548,9 @@ public InterClusterReplicationEndpointForTest() { } @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { - ReplicationResult success = super.replicate(replicateContext); - if (success == ReplicationResult.COMMITTED) { + public boolean replicate(ReplicateContext replicateContext) throws IOException { + boolean success = super.replicate(replicateContext); + if (success) { replicateCount.addAndGet(replicateContext.entries.size()); } return success; @@ -577,7 +577,7 @@ public static class ReplicationEndpointReturningFalse extends ReplicationEndpoin static AtomicBoolean replicated = new AtomicBoolean(false); @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { + public boolean replicate(ReplicateContext replicateContext) { try { // check row doAssert(row); @@ -589,7 +589,7 @@ public ReplicationResult replicate(ReplicateContext replicateContext) { LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get()); replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false - return replicated.get() ? ReplicationResult.COMMITTED : ReplicationResult.FAILED; + return replicated.get(); } } @@ -598,14 +598,14 @@ public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEnd static AtomicReference ex = new AtomicReference<>(null); @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { + public boolean replicate(ReplicateContext replicateContext) { try { super.replicate(replicateContext); doAssert(row); } catch (Exception e) { ex.set(e); } - return ReplicationResult.COMMITTED; + return true; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java index 50b0911970a3..b6b05ee864e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertArrayEquals; +import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -71,7 +72,7 @@ public class TestVerifyCellsReplicationEndpoint { public static final class EndpointForTest extends VerifyWALEntriesReplicationEndpoint { @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { + public boolean replicate(ReplicateContext replicateContext) throws IOException { LOG.info(replicateContext.getEntries().toString()); replicateContext.entries.stream().map(WAL.Entry::getEdit).map(WALEdit::getCells) .forEachOrdered(CELLS::addAll); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java index d7b5bcdcccbf..66f04dca36d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -95,7 +94,7 @@ public UUID getPeerUUID() { } @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { + public boolean replicate(ReplicateContext replicateContext) { synchronized (WRITER) { try { for (Entry entry : replicateContext.getEntries()) { @@ -106,7 +105,7 @@ public ReplicationResult replicate(ReplicateContext replicateContext) { throw new UncheckedIOException(e); } } - return ReplicationResult.COMMITTED; + return true; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 25eef51ff681..99e485b9edab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -59,7 +59,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueueData; import org.apache.hadoop.hbase.replication.ReplicationQueueId; -import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -531,7 +530,7 @@ public void testEmptyBatchCommitsPositionForCommitEndpoint() { shipper.shipEdits(emptyBatch); // With default (COMMIT) policy, empty entry batch should advance WAL position - Mockito.verify(shipper).updateLogPosition(emptyBatch, ReplicationResult.COMMITTED); + Mockito.verify(shipper).updateLogPosition(emptyBatch); } /** @@ -552,7 +551,7 @@ public void testEmptyBatchSubmitsPositionForSubmitEndpoint() { shipper.shipEdits(emptyBatch); // With SUBMIT policy, empty entry batch should NOT advance WAL position - Mockito.verify(shipper).updateLogPosition(emptyBatch, ReplicationResult.SUBMITTED); + Mockito.verify(shipper).updateLogPosition(emptyBatch); } private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index c99f25380de4..663b444dc4e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; @@ -95,13 +94,13 @@ public static final class ReplicationEndpointForTest extends DummyReplicationEnd private String clusterKey; @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { + public boolean replicate(ReplicateContext replicateContext) { // if you want to block the replication, for example, do not want the recovered source to be // removed if (clusterKey.endsWith("error")) { throw new RuntimeException("Inject error"); } - return ReplicationResult.COMMITTED; + return true; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java index cdbd1c73a2a7..3bb8457af314 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -219,7 +218,7 @@ public static void setEntriesCount(int i) { } @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { + public boolean replicate(ReplicateContext replicateContext) throws IOException { try { await(); } catch (InterruptedException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java index ffbc0d2cee5a..68d7378991ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java @@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.SecurityTests; @@ -474,8 +473,8 @@ public VisibilityReplicationEndPointForTest(ReplicationEndpoint endpoint, } @Override - public ReplicationResult replicate(ReplicateContext replicateContext) { - ReplicationResult ret = super.replicate(replicateContext); + public boolean replicate(ReplicateContext replicateContext) throws IOException { + boolean ret = super.replicate(replicateContext); lastEntries = replicateContext.getEntries(); replicateCount.incrementAndGet(); return ret;