From a98552553ff2928b1e1296eebcbe6ed8e851ef8d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 14 Nov 2024 12:40:13 +0800 Subject: [PATCH 1/2] [improve][broker] PIP-391: Enable batch index ACK by default --- conf/broker.conf | 2 +- .../org/apache/pulsar/broker/ServiceConfiguration.java | 2 +- .../broker/admin/v3/AdminApiTransactionTest.java | 1 - .../namespace/OwnerShipForCurrentServerTestBase.java | 1 - .../apache/pulsar/broker/service/BatchMessageTest.java | 1 - .../service/BatchMessageWithBatchIndexLevelTest.java | 10 ---------- .../pulsar/broker/service/SubscriptionSeekTest.java | 1 - .../broker/transaction/TransactionProduceTest.java | 2 -- .../pulsar/broker/transaction/TransactionTest.java | 4 ---- .../buffer/TransactionLowWaterMarkTest.java | 2 -- .../buffer/TransactionStablePositionTest.java | 2 -- .../coordinator/TransactionMetaStoreTestBase.java | 1 - .../pendingack/PendingAckInMemoryDeleteTest.java | 4 ---- .../pendingack/PendingAckPersistentTest.java | 8 -------- .../pendingack/impl/MLPendingAckStoreTest.java | 1 - .../apache/pulsar/client/api/CustomMessageIdTest.java | 1 - .../pulsar/client/api/UnloadSubscriptionTest.java | 4 +--- .../pulsar/client/impl/BatchMessageIndexAckTest.java | 4 ---- .../apache/pulsar/client/impl/NegativeAcksTest.java | 3 --- .../pulsar/client/impl/TransactionEndToEndTest.java | 10 ---------- .../org/apache/pulsar/client/api/ConsumerBuilder.java | 1 + .../client/impl/conf/ConsumerConfigurationData.java | 2 +- .../pulsar/client/impl/ConsumerBuilderImplTest.java | 2 +- .../tests/integration/transaction/TransactionTest.java | 1 - 24 files changed, 6 insertions(+), 64 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 4e246826ffbf9..c26899f6e4201 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -667,7 +667,7 @@ delayedDeliveryFixedDelayDetectionLookahead=50000 delayedDeliveryMaxDelayInMillis=0 # Whether to enable acknowledge of batch local index. -acknowledgmentAtBatchIndexLevelEnabled=false +acknowledgmentAtBatchIndexLevelEnabled=true # Enable tracking of replicated subscriptions state across clusters. enableReplicatedSubscriptions=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index d66ceafdfad73..3226c1bbbce0d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -423,7 +423,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece private long delayedDeliveryMaxDelayInMillis = 0; @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index") - private boolean acknowledgmentAtBatchIndexLevelEnabled = false; + private boolean acknowledgmentAtBatchIndexLevelEnabled = true; @FieldContext( category = CATEGORY_WEBSOCKET, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index e32af29c7e962..7f378e411f4d5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -859,7 +859,6 @@ public void testGetPositionStatsInPendingAckStatsFroBatch() throws Exception { @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) .subscriptionName(subscriptionName) - .enableBatchIndexAcknowledgment(true) .subscriptionType(SubscriptionType.Shared) .isAckReceiptEnabled(true) .topic(topic) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java index 46e8989ac3df4..5225ec120bd26 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java @@ -75,7 +75,6 @@ protected void startBroker() throws Exception { conf.setConfigurationMetadataStoreUrl("zk:localhost:3181"); conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); conf.setBookkeeperClientExposeStatsToPrometheus(true); - conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); conf.setBrokerShutdownTimeoutMs(0L); conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index b821c0cc66351..55d759ba74079 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -1037,7 +1037,6 @@ private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subTyp .isAckReceiptEnabled(true) .subscriptionName(subscriptionName) .subscriptionType(subType) - .enableBatchIndexAcknowledgment(true) .subscribe(); @Cleanup diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java index f21ac130e3cfd..1913eac640fd1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java @@ -69,7 +69,6 @@ public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest { @BeforeClass @Override protected void setup() throws Exception { - conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); super.baseSetup(); } @@ -87,7 +86,6 @@ public void testBatchMessageAck() { .subscriptionName(subscriptionName) .receiverQueueSize(50) .subscriptionType(SubscriptionType.Shared) - .enableBatchIndexAcknowledgment(true) .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) .subscribe(); @@ -212,7 +210,6 @@ public void testBatchMessageMultiNegtiveAck() throws Exception{ .subscriptionName(subscriptionName) .subscriptionType(SubscriptionType.Shared) .receiverQueueSize(10) - .enableBatchIndexAcknowledgment(true) .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) .subscribe(); @@ -254,7 +251,6 @@ public void testBatchMessageMultiNegtiveAck() throws Exception{ .subscriptionName(subscriptionName2) .subscriptionType(SubscriptionType.Shared) .receiverQueueSize(10) - .enableBatchIndexAcknowledgment(true) .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) .subscribe(); @Cleanup @@ -310,7 +306,6 @@ public void testAckMessageWithNotOwnerConsumerUnAckMessageCount() throws Excepti .isAckReceiptEnabled(true) .subscriptionName(subName) .subscriptionType(SubscriptionType.Shared) - .enableBatchIndexAcknowledgment(true) .subscribe(); @Cleanup @@ -322,7 +317,6 @@ public void testAckMessageWithNotOwnerConsumerUnAckMessageCount() throws Excepti .isAckReceiptEnabled(true) .subscriptionName(subName) .subscriptionType(SubscriptionType.Shared) - .enableBatchIndexAcknowledgment(true) .subscribe(); for (int i = 0; i < 5; i++) { @@ -385,7 +379,6 @@ public void testNegativeAckAndLongAckDelayWillNotLeadRepeatConsume() throws Exce .subscriptionName(subscriptionName) .subscriptionType(SubscriptionType.Shared) .negativeAckRedeliveryDelay(redeliveryDelaySeconds, TimeUnit.SECONDS) - .enableBatchIndexAcknowledgment(true) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .acknowledgmentGroupTime(1, TimeUnit.HOURS) .subscribe(); @@ -461,7 +454,6 @@ public void testMixIndexAndNonIndexUnAckMessageCount() throws Exception { .subscriptionName("sub") .subscriptionType(SubscriptionType.Shared) .acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS) - .enableBatchIndexAcknowledgment(true) .isAckReceiptEnabled(true) .subscribe(); @@ -492,7 +484,6 @@ public void testUnAckMessagesWhenConcurrentDeliveryAndAck() throws Exception { .topic(topicName) .receiverQueueSize(receiverQueueSize) .subscriptionName(subName) - .enableBatchIndexAcknowledgment(true) .subscriptionType(SubscriptionType.Shared) .isAckReceiptEnabled(true); @@ -666,7 +657,6 @@ public void testPermitsIfHalfAckBatchMessage() throws Exception { .topic(topicName) .receiverQueueSize(receiverQueueSize) .subscriptionName(subName) - .enableBatchIndexAcknowledgment(true) .subscriptionType(SubscriptionType.Shared) .isAckReceiptEnabled(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index 4970dc8818854..3f9c8e95fa04a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -87,7 +87,6 @@ protected void setup() throws Exception { conf.setDefaultRetentionSizeInMB(100); conf.setDefaultRetentionTimeInMinutes(100); super.baseSetup(); - conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); } @AfterClass(alwaysRun = true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java index 3d7ab902bf494..037dee1963508 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java @@ -283,7 +283,6 @@ public void ackCommitTest() throws Exception { .topic(ACK_COMMIT_TOPIC) .subscriptionName(subscriptionName) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .enableBatchIndexAcknowledgment(true) .subscriptionType(SubscriptionType.Shared) .subscribe(); @@ -348,7 +347,6 @@ public void ackAbortTest() throws Exception { .subscriptionName(subscriptionName) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .enableBatchIndexAcknowledgment(true) - .subscriptionType(SubscriptionType.Shared) .subscribe(); Awaitility.await().until(consumer::isConnected); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 35c9048ebb554..5972c1cc19099 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -397,7 +397,6 @@ public Consumer getConsumer(String topicName, String subName) throws Pul .topic(topicName) .subscriptionName(subName) .subscriptionType(SubscriptionType.Shared) - .enableBatchIndexAcknowledgment(true) .subscribe(); } @@ -1451,9 +1450,6 @@ public void testGetConnectExceptionForAckMsgWhenCnxIsNull() throws Exception { public void testPendingAckBatchMessageCommit() throws Exception { String topic = NAMESPACE1 + "/testPendingAckBatchMessageCommit"; - // enable batch index ack - conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); - @Cleanup Producer producer = pulsarClient .newProducer(Schema.BYTES) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java index 818b854ffe941..94036a3c358b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java @@ -117,7 +117,6 @@ public void testTransactionBufferLowWaterMark() throws Exception { .topic(TOPIC) .subscriptionName("test") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .enableBatchIndexAcknowledgment(true) .subscriptionType(SubscriptionType.Failover) .subscribe(); final String TEST1 = "test1"; @@ -196,7 +195,6 @@ public void testPendingAckLowWaterMark() throws Exception { .topic(TOPIC) .subscriptionName(subName) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .enableBatchIndexAcknowledgment(true) .subscriptionType(SubscriptionType.Failover) .subscribe(); final String TEST1 = "test1"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java index 0b50f91fd403c..4ad43a7bacf26 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java @@ -91,7 +91,6 @@ public void commitTxnTest() throws Exception { .topic(TOPIC) .subscriptionName("test") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .enableBatchIndexAcknowledgment(true) .subscriptionType(SubscriptionType.Failover) .subscribe(); final String TEST1 = "test1"; @@ -139,7 +138,6 @@ public void abortTxnTest() throws Exception { .topic(TOPIC) .subscriptionName("test") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .enableBatchIndexAcknowledgment(true) .subscriptionType(SubscriptionType.Failover) .subscribe(); final String TEST1 = "test1"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java index 5bf48932f3687..f91c4fdfdc5d9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java @@ -74,7 +74,6 @@ protected final void setup() throws Exception { config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); config.setDefaultNumberOfNamespaceBundles(1); config.setLoadBalancerEnabled(false); - config.setAcknowledgmentAtBatchIndexLevelEnabled(true); config.setTransactionCoordinatorEnabled(true); configurations[i] = config; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java index 1b4fd451c104f..4f773e8a124b0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java @@ -57,7 +57,6 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase { private static final int NUM_PARTITIONS = 16; @BeforeMethod protected void setup() throws Exception { - conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); setUpBase(1, NUM_PARTITIONS, NAMESPACE1 +"/test", 0); } @@ -76,7 +75,6 @@ public void txnAckTestNoBatchAndSharedSubMemoryDeleteTest() throws Exception { .topic(normalTopic) .isAckReceiptEnabled(true) .subscriptionName(subscriptionName) - .enableBatchIndexAcknowledgment(true) .subscriptionType(SubscriptionType.Shared) .ackTimeout(2, TimeUnit.SECONDS) .acknowledgmentGroupTime(0, TimeUnit.MICROSECONDS) @@ -155,7 +153,6 @@ public void txnAckTestBatchAndSharedSubMemoryDeleteTest() throws Exception { Consumer consumer = pulsarClient.newConsumer() .topic(normalTopic) .subscriptionName(subscriptionName) - .enableBatchIndexAcknowledgment(true) .subscriptionType(SubscriptionType.Shared) .subscribe(); @@ -273,7 +270,6 @@ public void testPendingAckClearPositionIsSmallerThanMarkDelete() throws Exceptio Consumer consumer = pulsarClient.newConsumer() .topic(normalTopic) .subscriptionName(subscriptionName) - .enableBatchIndexAcknowledgment(true) .subscriptionType(SubscriptionType.Shared) .subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java index fc6a10e385a54..96e3d087d9366 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java @@ -223,7 +223,6 @@ public void individualPendingAckReplayTest() throws Exception { .topic(PENDING_ACK_REPLAY_TOPIC) .subscriptionName(subName) .subscriptionType(SubscriptionType.Shared) - .enableBatchIndexAcknowledgment(true) .subscribe(); Transaction abortTxn = pulsarClient.newTransaction() @@ -336,7 +335,6 @@ public void testPendingAckMetrics() throws Exception { .topic(PENDING_ACK_REPLAY_TOPIC) .subscriptionName(subName) .subscriptionType(SubscriptionType.Exclusive) - .enableBatchIndexAcknowledgment(true) .subscribe(); for (int a = 0; a < messageCount; a++) { @@ -447,7 +445,6 @@ public void cumulativePendingAckReplayTest() throws Exception { .topic(PENDING_ACK_REPLAY_TOPIC) .subscriptionName(subName) .subscriptionType(SubscriptionType.Failover) - .enableBatchIndexAcknowledgment(true) .subscribe(); Transaction abortTxn = pulsarClient.newTransaction() @@ -534,7 +531,6 @@ private void testDeleteSubThenDeletePendingAckManagedLedger() throws Exception { .topic(topic) .subscriptionName(subName) .subscriptionType(SubscriptionType.Failover) - .enableBatchIndexAcknowledgment(true) .subscribe(); consumer.close(); @@ -563,7 +559,6 @@ private void testDeleteTopicThenDeletePendingAckManagedLedger() throws Exception .topic(topic) .subscriptionName(subName1) .subscriptionType(SubscriptionType.Failover) - .enableBatchIndexAcknowledgment(true) .subscribe(); consumer1.close(); @@ -573,7 +568,6 @@ private void testDeleteTopicThenDeletePendingAckManagedLedger() throws Exception .topic(topic) .subscriptionName(subName2) .subscriptionType(SubscriptionType.Failover) - .enableBatchIndexAcknowledgment(true) .subscribe(); consumer2.close(); @@ -699,7 +693,6 @@ public void testPendingAckLowWaterMarkRemoveFirstTxn() throws Exception { .topic(topic) .subscriptionName(subName) .subscriptionType(SubscriptionType.Failover) - .enableBatchIndexAcknowledgment(true) .subscribe(); @Cleanup @@ -827,7 +820,6 @@ public void testTransactionConflictExceptionWhenAckBatchMessage() throws Excepti @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) .subscriptionName(subscriptionName) - .enableBatchIndexAcknowledgment(true) .subscriptionType(SubscriptionType.Exclusive) .isAckReceiptEnabled(true) .topic(topic) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java index 6bbfb25ee2ff7..6c05d6073cf4a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java @@ -73,7 +73,6 @@ public class MLPendingAckStoreTest extends TransactionTestBase { @BeforeClass @Override protected void setup() throws Exception { - conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); setUpBase(1, 1, NAMESPACE1 + "/test", 0); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java index 52bfc9dda37e4..d64a2d8b7e465 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java @@ -83,7 +83,6 @@ public void testAcknowledgment(boolean enableBatching) throws Exception { final var consumer = pulsarClient.newConsumer(Schema.INT32) .topic(topic) .subscriptionName("sub") - .enableBatchIndexAcknowledgment(true) .isAckReceiptEnabled(true) .subscribe(); for (int i = 0; i < 10; i++) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java index 22f7a5d6a43e4..7f5da9d12f81a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java @@ -60,7 +60,6 @@ protected void doInitConf() throws Exception { super.doInitConf(); conf.setSystemTopicEnabled(false); conf.setTransactionCoordinatorEnabled(false); - conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); } @AfterClass(alwaysRun = true) @@ -243,7 +242,6 @@ private Consumer createConsumer(String topicName, String subName, Subscr .subscriptionName(subName) .subscriptionType(subType) .isAckReceiptEnabled(true) - .enableBatchIndexAcknowledgment(true) .subscribe(); return consumer; } @@ -264,4 +262,4 @@ private MessagesEntry receiveAllMessages(Consumer consumer) throws Excep private record MessagesEntry(Set messageSet, Set messageIdSet) {} -} \ No newline at end of file +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java index 14549c0b7d091..71c29df05a872 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java @@ -55,7 +55,6 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase { @BeforeMethod @Override protected void setup() throws Exception { - conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); super.internalSetup(); super.producerBaseSetup(); doReturn(CompletableFuture.completedFuture(new LedgerMetadata() { @@ -174,7 +173,6 @@ public void testBatchMessageIndexAckForSharedSubscription(boolean ackReceiptEnab .receiverQueueSize(100) .isAckReceiptEnabled(ackReceiptEnabled) .subscriptionType(SubscriptionType.Shared) - .enableBatchIndexAcknowledgment(true) .negativeAckRedeliveryDelay(2, TimeUnit.SECONDS) .subscribe(); @@ -254,7 +252,6 @@ public void testBatchMessageIndexAckForExclusiveSubscription(boolean ackReceiptE .subscriptionName("sub") .receiverQueueSize(100) .isAckReceiptEnabled(ackReceiptEnabled) - .enableBatchIndexAcknowledgment(true) .subscribe(); @Cleanup @@ -324,7 +321,6 @@ public void testDoNotRecycleAckSetMultipleTimes() throws Exception { Consumer consumer = pulsarClient.newConsumer() .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS) .topic(topic) - .enableBatchIndexAcknowledgment(true) .subscriptionName("test") .subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 182c952eac82d..10f4f666607af 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -380,7 +380,6 @@ public void testNegativeAcksWithBatch() throws Exception { @Test public void testNegativeAcksWithBatchAckEnabled() throws Exception { cleanup(); - conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); setup(); String topic = BrokerTestUtil.newUniqueName("testNegativeAcksWithBatchAckEnabled"); @@ -390,7 +389,6 @@ public void testNegativeAcksWithBatchAckEnabled() throws Exception { .subscriptionName("sub1") .acknowledgmentGroupTime(0, TimeUnit.SECONDS) .subscriptionType(SubscriptionType.Shared) - .enableBatchIndexAcknowledgment(true) .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS) .subscribe(); @@ -437,7 +435,6 @@ public void testFailoverConsumerBatchCumulateAck() throws Exception { .topic(topic) .subscriptionName("sub") .subscriptionType(SubscriptionType.Failover) - .enableBatchIndexAcknowledgment(true) .acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS) .receiverQueueSize(10) .subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 05245abf3741e..52f230733eede 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -98,7 +98,6 @@ public class TransactionEndToEndTest extends TransactionTestBase { private static final int waitTimeForCannotReceiveMsgInSec = 5; @BeforeClass(alwaysRun = true) protected void setup() throws Exception { - conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION); admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1); } @@ -138,7 +137,6 @@ private void testIndividualAckAbortFilterAckSetInPendingAckState() throws Except .isAckReceiptEnabled(true) .subscriptionName("test") .subscriptionType(SubscriptionType.Shared) - .enableBatchIndexAcknowledgment(true) .subscribe(); for (int i = 0; i < count; i++) { @@ -205,7 +203,6 @@ private void testFilterMsgsInPendingAckStateWhenConsumerDisconnect(boolean enabl .isAckReceiptEnabled(true) .subscriptionName("test") .subscriptionType(SubscriptionType.Shared) - .enableBatchIndexAcknowledgment(true) .subscribe(); for (int i = 0; i < count; i++) { @@ -237,7 +234,6 @@ private void testFilterMsgsInPendingAckStateWhenConsumerDisconnect(boolean enabl .isAckReceiptEnabled(true) .subscriptionName("test") .subscriptionType(SubscriptionType.Shared) - .enableBatchIndexAcknowledgment(true) .subscribe(); Message message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); @@ -320,7 +316,6 @@ private void produceCommitTest(boolean enableBatch) throws Exception { .newConsumer() .topic(TOPIC_OUTPUT) .subscriptionName("test") - .enableBatchIndexAcknowledgment(true) .subscribe(); Awaitility.await().until(consumer::isConnected); @@ -393,7 +388,6 @@ public void produceAbortTest() throws Exception { .topic(TOPIC_OUTPUT) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionName(subName) - .enableBatchIndexAcknowledgment(true) .subscribe(); Awaitility.await().until(consumer::isConnected); @@ -545,7 +539,6 @@ protected void txnAckTest(boolean batchEnable, int maxBatchSize, Consumer consumer = pulsarClient.newConsumer() .topic(normalTopic) .subscriptionName("test") - .enableBatchIndexAcknowledgment(true) .subscriptionType(subscriptionType) .subscribe(); Awaitility.await().until(consumer::isConnected); @@ -648,7 +641,6 @@ public void txnMessageAckTest() throws Exception { .newConsumer() .topic(topic) .subscriptionName(subName) - .enableBatchIndexAcknowledgment(true) .acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS) .subscribe(); Awaitility.await().until(consumer::isConnected); @@ -773,7 +765,6 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri Consumer consumer = pulsarClient.newConsumer() .topic(normalTopic) .subscriptionName("test") - .enableBatchIndexAcknowledgment(true) .subscriptionType(subscriptionType) .ackTimeout(1, TimeUnit.MINUTES) .subscribe(); @@ -1156,7 +1147,6 @@ public void txnTransactionRedeliverNullDispatcher(CommandAck.AckType ackType) th .newConsumer() .topic(topic) .subscriptionName(subName) - .enableBatchIndexAcknowledgment(true) .acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS) .subscribe(); Awaitility.await().until(consumer::isConnected); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 8eaf5ca969f67..142c474114912 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -733,6 +733,7 @@ public interface ConsumerBuilder extends Cloneable { /** * Enable or disable batch index acknowledgment. To enable this feature, ensure batch index acknowledgment * is enabled on the broker side. + * Default: true */ ConsumerBuilder enableBatchIndexAcknowledgment(boolean batchIndexAcknowledgmentEnabled); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index dc9251a975c39..4be8c4ed73e90 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -400,7 +400,7 @@ public int getMaxPendingChuckedMessage() { @JsonIgnore private KeySharedPolicy keySharedPolicy; - private boolean batchIndexAckEnabled = false; + private boolean batchIndexAckEnabled = true; private boolean ackReceiptEnabled = false; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index c103712d40055..5e1d6fb956c17 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -566,7 +566,7 @@ public void testLoadConfNotModified() { assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 60); assertNull(configurationData.getReplicateSubscriptionState()); assertFalse(configurationData.isResetIncludeHead()); - assertFalse(configurationData.isBatchIndexAckEnabled()); + assertTrue(configurationData.isBatchIndexAckEnabled()); assertFalse(configurationData.isAckReceiptEnabled()); assertFalse(configurationData.isPoolMessages()); assertFalse(configurationData.isStartPaused()); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TransactionTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TransactionTest.java index e4bb384ff2c87..16d73c89b2ea0 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TransactionTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TransactionTest.java @@ -98,7 +98,6 @@ public void transferNormalTest(Supplier serviceUrl) throws Exception { .subscriptionName("integration-test") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Shared) - .enableBatchIndexAcknowledgment(true) .subscribe(); Awaitility.await().until(transferConsumer::isConnected); log.info("transfer consumer create finished"); From 29dfe317ff50fe25c94dfddadf5def5b415ca7da Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 23 Apr 2025 20:31:23 +0800 Subject: [PATCH 2/2] Fix wrong change --- .../pulsar/broker/transaction/TransactionProduceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java index 037dee1963508..4054d68226bd2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java @@ -346,7 +346,7 @@ public void ackAbortTest() throws Exception { .topic(ACK_ABORT_TOPIC) .subscriptionName(subscriptionName) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .enableBatchIndexAcknowledgment(true) + .subscriptionType(SubscriptionType.Shared) .subscribe(); Awaitility.await().until(consumer::isConnected);