-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[test] Flaky-test: OneWayReplicatorDeduplicationTest.cleanup #24988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
@hageshiame Please add the following content to your PR description and select a checkbox: |
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a review comment, please check. Please also make sure to add a high-level description of the modifications to the PR description.
| // Added: Wait for replicator to completely stop | ||
| Awaitility.await().pollDelay(Duration.ofMillis(500)).untilAsserted(() -> { | ||
| for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { | ||
| if (cursor.getName().equals("pulsar.repl.r2")) { | ||
| return false; | ||
| } | ||
| } | ||
| return true; | ||
| }); | ||
| admin1.topics().delete(topicName); | ||
| admin2.topics().delete(topicName); | ||
| } | ||
|
|
||
| @DataProvider(name = "enabledDeduplication") | ||
| public Object[][] enabledDeduplication() { | ||
| return new Object[][] { | ||
| {true}, | ||
| {false} | ||
| }; | ||
| } | ||
|
|
||
| /*** | ||
| * To reproduce the issue that replication loss message if enabled deduplication | ||
| * 1. Publishing in the source cluster | ||
| * 1-1. Producer-1 send 2 messages: M1, M2 | ||
| * 1-2. Producer-2 send 2 messages: M3, M4 | ||
| * 2. Replicate messages to the remote cluster | ||
| * 2-1. Copies M1 and M2 | ||
| * 2-2. Repeatedly copies M1 and M2. and copies M3 and M4. | ||
| * 2-2-1. After repeatedly copies M1 and M2, the network broke. | ||
| * 3. After a topic unloading. | ||
| * 3-1. The replicator will start after the topic is loaded up. | ||
| * 3-2. The client will create a new connection. | ||
| * 4. Verify: All 4 messages are copied to the remote cluster. | ||
| */ | ||
| @Test(timeOut = 360 * 1000, dataProvider = "enabledDeduplication") | ||
| public void testDeduplicationNotLostMessage(boolean enabledDeduplication) throws Exception { | ||
| waitInternalClientCreated(); | ||
|
|
||
| /** | ||
| * step-2: Inject a mechanism that makes the client connect broke after repeatedly copied M1 and M2. | ||
| */ | ||
| final List<ByteBufPair> duplicatedMsgs = new ArrayList<>(); | ||
| final int repeatMsgIndex = 2; | ||
| AtomicInteger msgSent = new AtomicInteger(0); | ||
| ConcurrentHashSet<Channel> injectedChannel = new ConcurrentHashSet<>(); | ||
| Runnable taskToClearInjection = injectReplicatorClientCnx( | ||
| (conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { | ||
|
|
||
| @Override | ||
| public ChannelHandlerContext ctx() { | ||
| final ChannelHandlerContext originalCtx = super.ctx; | ||
| ChannelHandlerContext spyContext = spy(originalCtx); | ||
| Answer injectedAnswer = invocation -> { | ||
| // Do not repeat the messages re-sending, and clear the previous cached messages when | ||
| // calling re-sending, to avoid publishing outs of order. | ||
| for (StackTraceElement stackTraceElement : Thread.currentThread().getStackTrace()) { | ||
| if (stackTraceElement.toString().contains("recoverProcessOpSendMsgFrom") | ||
| || stackTraceElement.toString().contains("resendMessages")) { | ||
| duplicatedMsgs.clear(); | ||
| return invocation.callRealMethod(); | ||
| } | ||
| } | ||
|
|
||
| Object data = invocation.getArguments()[0]; | ||
| if (!(data instanceof ByteBufPair)) { | ||
| return invocation.callRealMethod(); | ||
| } | ||
| // Repeatedly send every message. | ||
| ByteBufPair byteBufPair = (ByteBufPair) data; | ||
| ByteBuf buf1 = byteBufPair.getFirst(); | ||
| ByteBuf buf2 = byteBufPair.getSecond(); | ||
| int bufferIndex1 = buf1.readerIndex(); | ||
| int bufferIndex2 = buf2.readerIndex(); | ||
| // Skip totalSize. | ||
| buf1.readInt(); | ||
| int cmdSize = buf1.readInt(); | ||
| BaseCommand cmd = new BaseCommand(); | ||
| cmd.parseFrom(buf1, cmdSize); | ||
| buf1.readerIndex(bufferIndex1); | ||
| if (cmd.getType().equals(BaseCommand.Type.SEND)) { | ||
| synchronized (duplicatedMsgs) { | ||
| if (duplicatedMsgs.isEmpty() && msgSent.get() == repeatMsgIndex) { | ||
| return null; | ||
| } | ||
| if (msgSent.get() == repeatMsgIndex) { | ||
| for (ByteBufPair bufferPair : duplicatedMsgs) { | ||
| originalCtx.channel().write(bufferPair, originalCtx.voidPromise()); | ||
| originalCtx.channel().flush(); | ||
| } | ||
| duplicatedMsgs.clear(); | ||
| return null; | ||
| } | ||
| } | ||
| ByteBuf newBuffer1 = UnpooledByteBufAllocator.DEFAULT.heapBuffer( | ||
| buf1.readableBytes()); | ||
| buf1.readBytes(newBuffer1); | ||
| buf1.readerIndex(bufferIndex1); | ||
| ByteBuf newBuffer2 = UnpooledByteBufAllocator.DEFAULT.heapBuffer( | ||
| buf2.readableBytes()); | ||
| buf2.readBytes(newBuffer2); | ||
| buf2.readerIndex(bufferIndex2); | ||
| synchronized (duplicatedMsgs) { | ||
| if (newBuffer2.readableBytes() > 0 && msgSent.incrementAndGet() <= repeatMsgIndex) { | ||
| duplicatedMsgs.add(ByteBufPair.get(newBuffer1, newBuffer2)); | ||
| } | ||
| } | ||
| return invocation.callRealMethod(); | ||
| } else { | ||
| return invocation.callRealMethod(); | ||
| } | ||
| }; | ||
| doAnswer(injectedAnswer).when(spyContext).write(any()); | ||
| doAnswer(injectedAnswer).when(spyContext).write(any(), any(ChannelPromise.class)); | ||
| doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any()); | ||
| doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any(), any(ChannelPromise.class)); | ||
| injectedChannel.add(originalCtx.channel()); | ||
| return spyContext; | ||
| } | ||
| }); | ||
|
|
||
| // Create topics and enable deduplication. | ||
| final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); | ||
| admin1.topics().createNonPartitionedTopic(topicName); | ||
| admin1.topics().createSubscription(topicName, "s1", MessageId.earliest); | ||
| admin2.topics().createNonPartitionedTopic(topicName); | ||
| admin2.topics().createSubscription(topicName, "s1", MessageId.earliest); | ||
| PersistentTopic tp1 = | ||
| (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); | ||
| PersistentTopic tp2 = | ||
| (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); | ||
| ManagedLedgerImpl ml2 = (ManagedLedgerImpl) tp2.getManagedLedger(); | ||
| if (enabledDeduplication) { | ||
| Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { | ||
| PersistentTopic persistentTopic1 = | ||
| (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); | ||
| PersistentTopic persistentTopic2 = | ||
| (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); | ||
| admin1.topicPolicies().setDeduplicationStatus(topicName, true); | ||
| admin2.topicPolicies().setDeduplicationStatus(topicName, true); | ||
| assertEquals(persistentTopic1.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), | ||
| Boolean.TRUE); | ||
| assertEquals(persistentTopic2.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), | ||
| Boolean.TRUE); | ||
| }); | ||
| } | ||
| // Let broker persist messages one by one, in other words, it starts to persist the next message after the | ||
| // previous has been written into BKs. | ||
| PersistentTopic spyTp2 = spy(tp2); | ||
| doAnswer(invocation -> { | ||
| try { | ||
| Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> { | ||
| assertEquals(ml2.getPendingAddEntriesCount(), 0); | ||
| }); | ||
| } catch (Throwable throwable) { | ||
| // Ignore this timeout error. | ||
| } | ||
| return invocation.callRealMethod(); | ||
| }).when(spyTp2).publishMessage(any(ByteBuf.class), any(Topic.PublishContext.class)); | ||
| CompletableFuture<Optional<Topic>> originalTp2 = pulsar2.getBrokerService().getTopics().put(tp2.getName(), | ||
| CompletableFuture.completedFuture(Optional.of(spyTp2))); | ||
|
|
||
| /** | ||
| * Step-1: Publishes messages in the source cluster and start replication, | ||
| */ | ||
| ProducerImpl p1 = (ProducerImpl) client1.newProducer().topic(topicName).producerName("p1").create(); | ||
| ProducerImpl p2 = (ProducerImpl) client1.newProducer().topic(topicName).producerName("p2").create(); | ||
| p1.send("1".toString().getBytes(StandardCharsets.UTF_8)); | ||
| p1.send("2".toString().getBytes(StandardCharsets.UTF_8)); | ||
| p2.send("3".toString().getBytes(StandardCharsets.UTF_8)); | ||
| p2.send("4".toString().getBytes(StandardCharsets.UTF_8)); | ||
|
|
||
| // Enable replication and wait the task to be finished, it should not finish if no bug. | ||
| admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); | ||
| waitReplicatorStarted(topicName); | ||
| try { | ||
| Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { | ||
| for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { | ||
| if (cursor.getName().equals("pulsar.repl.r2")) { | ||
| long replBacklog = cursor.getNumberOfEntriesInBacklog(true); | ||
| log.info("repl backlog: {}", replBacklog); | ||
| assertEquals(replBacklog, 0); | ||
| } | ||
| } | ||
| }); | ||
| } catch (Throwable t) { | ||
| // Ignore the error. | ||
| } | ||
|
|
||
| /** | ||
| * Step-3: remove the injections, unload topics and rebuild connections of the replicator. | ||
| */ | ||
| taskToClearInjection.run(); | ||
| pulsar2.getBrokerService().getTopics().put(tp2.getName(), originalTp2); | ||
| admin1.topics().unload(topicName); | ||
| admin2.topics().unload(topicName); | ||
| for (Channel channel : injectedChannel) { | ||
| channel.close(); | ||
| } | ||
| waitReplicatorStarted(topicName); | ||
| PersistentTopic tp12 = | ||
| (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); | ||
| PersistentTopic tp22 = | ||
| (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); | ||
| Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { | ||
| for (ManagedCursor cursor : tp12.getManagedLedger().getCursors()) { | ||
| if (cursor.getName().equals("pulsar.repl.r2")) { | ||
| long replBacklog = cursor.getNumberOfEntriesInBacklog(true); | ||
| log.info("repl backlog: {}", replBacklog); | ||
| assertEquals(replBacklog, 0); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| /** | ||
| * Verify: All 4 messages are copied to the remote cluster. | ||
| */ | ||
| List<String> msgReceived = new ArrayList<>(); | ||
| Consumer consumer = client2.newConsumer().topic(topicName) | ||
| .subscriptionName("s1").subscribe(); | ||
| while (true) { | ||
| Message msg = consumer.receive(10, TimeUnit.SECONDS); | ||
| if (msg == null) { | ||
| break; | ||
| } | ||
| MessageIdAdv messageIdAdv = (MessageIdAdv) msg.getMessageId(); | ||
| log.info("received msg. source {}, target {}:{}", StringUtils.join(msg.getProperties().values(), ":"), | ||
| messageIdAdv.getLedgerId(), messageIdAdv.getEntryId()); | ||
| msgReceived.add(new String(msg.getData(), StandardCharsets.UTF_8)); | ||
| consumer.acknowledgeAsync(msg); | ||
| } | ||
|
|
||
| log.info("received msgs: {}", msgReceived); | ||
| assertTrue(msgReceived.contains("1")); | ||
| assertTrue(msgReceived.contains("2")); | ||
| assertTrue(msgReceived.contains("3")); | ||
| assertTrue(msgReceived.contains("4")); | ||
| if (enabledDeduplication) { | ||
| assertEquals(msgReceived, Arrays.asList("1", "2", "3", "4")); | ||
| } | ||
|
|
||
| // cleanup. | ||
| consumer.close(); | ||
| p1.close(); | ||
| p2.close(); | ||
| admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); | ||
| waitReplicatorStopped(topicName); | ||
| Awaitility.await().until(() -> { | ||
| for (ManagedCursor cursor : tp12.getManagedLedger().getCursors()) { | ||
| // Added: Wait for replicator to completely stop | ||
| Awaitility.await().pollDelay(Duration.ofMillis(500)).untilAsserted(() -> { | ||
| for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { | ||
| if (cursor.getName().equals("pulsar.repl.r2")) { | ||
| return false; | ||
| } | ||
| } | ||
| return true; | ||
| }); | ||
| admin1.topics().delete(topicName); | ||
| admin2.topics().delete(topicName); | ||
| } | ||
|
|
||
| @Test(timeOut = 360 * 1000, dataProvider = "enabledDeduplication") | ||
| public void testReplicationLoadSchemaTimeout(boolean enabledDeduplication) throws Exception { | ||
| waitInternalClientCreated(); | ||
|
|
||
| /** | ||
| * Inject a timeout error for Get Schema. | ||
| */ | ||
| Field filedSchemaRegistryService = PulsarService.class.getDeclaredField("schemaRegistryService"); | ||
| filedSchemaRegistryService.setAccessible(true); | ||
| SchemaRegistryService originalSchemaRegistryService = | ||
| (SchemaRegistryService) filedSchemaRegistryService.get(pulsar2); | ||
| SchemaRegistryService spySchemaRegistryService = spy(originalSchemaRegistryService); | ||
| AtomicBoolean getSchemaSuccess = new AtomicBoolean(false); | ||
| doAnswer(invocation -> { | ||
| if (getSchemaSuccess.get()) { | ||
| getSchemaSuccess.set(false); | ||
| return invocation.callRealMethod(); | ||
| } else { | ||
| getSchemaSuccess.set(true); | ||
| } | ||
| Thread.sleep(60 * 1000); | ||
| return invocation.callRealMethod(); | ||
| }).when(spySchemaRegistryService).findSchemaVersion(any(String.class), any(SchemaData.class)); | ||
| filedSchemaRegistryService.set(pulsar2, spySchemaRegistryService); | ||
| Runnable taskToClearInjection = injectReplicatorClientCnx( | ||
| (conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { | ||
| @Override | ||
| protected void handleGetSchemaResponse(CommandGetSchemaResponse commandGetSchemaResponse) { | ||
| if (getSchemaSuccess.get()) { | ||
| getSchemaSuccess.set(false); | ||
| super.handleGetSchemaResponse(commandGetSchemaResponse); | ||
| return; | ||
| } else { | ||
| getSchemaSuccess.set(true); | ||
| } | ||
| checkArgument(state == State.Ready); | ||
| long requestId = commandGetSchemaResponse.getRequestId(); | ||
| CompletableFuture<CommandGetSchemaResponse> future = | ||
| (CompletableFuture<CommandGetSchemaResponse>) pendingRequests.remove(requestId); | ||
| if (future == null) { | ||
| duplicatedResponseCounter.incrementAndGet(); | ||
| log.warn("{} Received unknown request id from server: {}", ctx.channel(), requestId); | ||
| return; | ||
| } | ||
| future.completeExceptionally(new PulsarClientException.TimeoutException("Mocked timeout")); | ||
| } | ||
|
|
||
| @Override | ||
| protected void handleGetOrCreateSchemaResponse(CommandGetOrCreateSchemaResponse | ||
| commandGetOrCreateSchemaResponse) { | ||
|
|
||
| if (getSchemaSuccess.get()) { | ||
| getSchemaSuccess.set(false); | ||
| super.handleGetOrCreateSchemaResponse(commandGetOrCreateSchemaResponse); | ||
| return; | ||
| } else { | ||
| getSchemaSuccess.set(true); | ||
| } | ||
|
|
||
| checkArgument(state == State.Ready); | ||
| long requestId = commandGetOrCreateSchemaResponse.getRequestId(); | ||
| CompletableFuture<CommandGetOrCreateSchemaResponse> future = | ||
| (CompletableFuture<CommandGetOrCreateSchemaResponse>) pendingRequests.remove(requestId); | ||
| if (future == null) { | ||
| duplicatedResponseCounter.incrementAndGet(); | ||
| log.warn("{} Received unknown request id from server: {}", ctx.channel(), requestId); | ||
| return; | ||
| } | ||
| future.completeExceptionally(new PulsarClientException.TimeoutException("Mocked timeout")); | ||
| } | ||
| }); | ||
|
|
||
| // Create topics and enable deduplication. | ||
| final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); | ||
| admin1.topics().createNonPartitionedTopic(topicName); | ||
| admin1.topics().createSubscription(topicName, "s1", MessageId.earliest); | ||
| admin2.topics().createNonPartitionedTopic(topicName); | ||
| admin2.topics().createSubscription(topicName, "s1", MessageId.earliest); | ||
| PersistentTopic tp1 = | ||
| (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); | ||
| PersistentTopic tp2 = | ||
| (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); | ||
| if (enabledDeduplication) { | ||
| Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { | ||
| PersistentTopic persistentTopic1 = | ||
| (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); | ||
| PersistentTopic persistentTopic2 = | ||
| (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); | ||
| admin1.topicPolicies().setDeduplicationStatus(topicName, true); | ||
| admin2.topicPolicies().setDeduplicationStatus(topicName, true); | ||
| assertEquals(persistentTopic1.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), | ||
| Boolean.TRUE); | ||
| assertEquals(persistentTopic2.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), | ||
| Boolean.TRUE); | ||
| }); | ||
| } | ||
| Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { | ||
| PersistentTopic persistentTopic1 = | ||
| (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); | ||
| PersistentTopic persistentTopic2 = | ||
| (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); | ||
| admin1.topicPolicies().setSchemaCompatibilityStrategy(topicName, | ||
| SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); | ||
| admin2.topicPolicies().setSchemaCompatibilityStrategy(topicName, | ||
| SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); | ||
| assertEquals(persistentTopic1.getHierarchyTopicPolicies().getSchemaCompatibilityStrategy().get(), | ||
| SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); | ||
| assertEquals(persistentTopic2.getHierarchyTopicPolicies().getSchemaCompatibilityStrategy().get(), | ||
| SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); | ||
| }); | ||
|
|
||
| // Publishes messages in the source cluster. | ||
| Producer p1 = client1.newProducer().topic(topicName).producerName("p1").create(); | ||
| Producer p2 = client1.newProducer().topic(topicName).producerName("p2").create(); | ||
| Producer p3 = client1.newProducer(Schema.STRING).topic(topicName).producerName("p3").create(); | ||
| p1.send("1".toString().getBytes(StandardCharsets.UTF_8)); | ||
| p1.send("2".toString().getBytes(StandardCharsets.UTF_8)); | ||
| p3.send("2-1"); | ||
| p3.send("2-2"); | ||
| p2.send("3".toString().getBytes(StandardCharsets.UTF_8)); | ||
| p2.send("4".toString().getBytes(StandardCharsets.UTF_8)); | ||
|
|
||
| // Enable replication and wait the task to be finished, it should not finish if no bug. | ||
| admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); | ||
| waitReplicatorStarted(topicName); | ||
| Awaitility.await().atMost(Duration.ofSeconds(180)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { | ||
| for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { | ||
| if (cursor.getName().equals("pulsar.repl.r2")) { | ||
| long replBacklog = cursor.getNumberOfEntriesInBacklog(true); | ||
| log.info("repl backlog: {}", replBacklog); | ||
| assertEquals(replBacklog, 0); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| // Verify: All messages are copied to the remote cluster. | ||
| List<String> msgReceived = new ArrayList<>(); | ||
| Consumer consumer = client2.newConsumer().topic(topicName) | ||
| .subscriptionName("s1").subscribe(); | ||
| while (true) { | ||
| Message msg = consumer.receive(10, TimeUnit.SECONDS); | ||
| if (msg == null) { | ||
| break; | ||
| } | ||
| MessageIdAdv messageIdAdv = (MessageIdAdv) msg.getMessageId(); | ||
| log.info("received msg. source {}, target {}:{}", StringUtils.join(msg.getProperties().values(), ":"), | ||
| messageIdAdv.getLedgerId(), messageIdAdv.getEntryId()); | ||
| msgReceived.add(new String(msg.getData(), StandardCharsets.UTF_8)); | ||
| consumer.acknowledgeAsync(msg); | ||
| } | ||
| log.info("received msgs: {}", msgReceived); | ||
| assertTrue(msgReceived.contains("1")); | ||
| assertTrue(msgReceived.contains("2")); | ||
| assertTrue(msgReceived.contains("2-1")); | ||
| assertTrue(msgReceived.contains("2-2")); | ||
| assertTrue(msgReceived.contains("3")); | ||
| assertTrue(msgReceived.contains("4")); | ||
| if (enabledDeduplication) { | ||
| assertEquals(msgReceived, Arrays.asList("1", "2", "2-1", "2-2", "3", "4")); | ||
| } | ||
|
|
||
| // cleanup. | ||
| taskToClearInjection.run(); | ||
| filedSchemaRegistryService.set(pulsar2, originalSchemaRegistryService); | ||
| consumer.close(); | ||
| p1.close(); | ||
| p2.close(); | ||
| p3.close(); | ||
| admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); | ||
| waitReplicatorStopped(topicName); | ||
| Awaitility.await().until(() -> { | ||
| // Added: Wait for replicator to completely stop | ||
| Awaitility.await().pollDelay(Duration.ofMillis(500)).untilAsserted(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes don't make sense since until is changed to untilAsserted.
untilAssertedexpects that there's some assertion that is expected to succeed.untilexpects that the return value istrue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hageshiame Perhaps the inner for loop can be changed to collect conditions, and then use assertTrue / assertFalse to express.
// Added: Wait for replicator to completely stop
Awaitility.await().pollDelay(Duration.ofMillis(500)).untilAsserted(() -> {
boolean hasReplicatorCursor = false;
for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) {
if (cursor.getName().equals("pulsar.repl.r2")) {
hasReplicatorCursor = true;
break;
}
}
assertTrue(!hasReplicatorCursor);
});
Motivation
Related to issue-24581
doc-not-neededModifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: