Skip to content

Commit 1555607

Browse files
authored
[improve][broker]Improve error response of failed to delete topic if it has replicators connected (#24975)
1 parent ed31d82 commit 1555607

File tree

8 files changed

+65
-12
lines changed

8 files changed

+65
-12
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -796,8 +796,7 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
796796
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
797797
if (realCause instanceof PreconditionFailedException) {
798798
asyncResponse.resume(
799-
new RestException(Status.PRECONDITION_FAILED,
800-
"Topic has active producers/subscriptions"));
799+
new RestException(Status.PRECONDITION_FAILED, realCause.getMessage()));
801800
} else if (realCause instanceof WebApplicationException){
802801
asyncResponse.resume(realCause);
803802
} else if (realCause instanceof MetadataStoreException.NotFoundException) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2838,7 +2838,7 @@ protected void handleNewTxn(CommandNewTxn command) {
28382838
.whenComplete(((txnID, ex) -> {
28392839
if (ex == null) {
28402840
if (log.isDebugEnabled()) {
2841-
log.debug("Send response {} for new txn request {}", tcId.getId(), requestId);
2841+
log.debug("Send response {} for new txn request {}", txnID, requestId);
28422842
}
28432843
commandSender.sendNewTxnResponse(requestId, txnID, tcId.getId());
28442844
} else {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1520,8 +1520,23 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
15201520
"Topic has " + producers.size() + " connected producers"));
15211521
}
15221522
} else if (currentUsageCount() > 0) {
1523-
return FutureUtil.failedFuture(new TopicBusyException(
1524-
"Topic has " + currentUsageCount() + " connected producers/consumers"));
1523+
StringBuilder errorMsg = new StringBuilder("Topic has");
1524+
errorMsg.append(" ").append(currentUsageCount())
1525+
.append(currentUsageCount() == 1 ? " client" : " clients").append(" connected");
1526+
long consumerCount = subscriptions.values().stream().map(sub -> sub.getConsumers().size())
1527+
.reduce(0, Integer::sum);
1528+
long replicatorCount = 0;
1529+
long producerCount = 0;
1530+
if (!producers.isEmpty()) {
1531+
replicatorCount = producers.values().stream().filter(Producer::isRemote).count();
1532+
if (producers.size() > replicatorCount) {
1533+
producerCount = producers.size() - replicatorCount;
1534+
}
1535+
}
1536+
errorMsg.append(" Including").append(" ").append(consumerCount).append(" consumers,")
1537+
.append(" ").append(producerCount).append(" producers,").append(" and")
1538+
.append(" ").append(replicatorCount).append(" replicators.");
1539+
return FutureUtil.failedFuture(new TopicBusyException(errorMsg.toString()));
15251540
}
15261541
}
15271542

pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,7 @@ public void testEventsActiveSub(String topicTypePersistence, String topicTypePar
201201
if (forceDelete) {
202202
throw e;
203203
}
204-
assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")
205-
|| e.getMessage().contains("connected producers/consumers"));
204+
assertTrue(e.getMessage().contains("Topic has"));
206205
}
207206

208207
final String[] expectedEvents;

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,37 @@ public void cleanup() throws Exception {
139139
super.cleanup();
140140
}
141141

142+
@Test(timeOut = 45 * 1000)
143+
public void testDeleteTopicWhenReplicating() throws Exception {
144+
final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
145+
Producer<byte[]> producer1 = client1.newProducer().topic(topicName1).create();
146+
waitReplicatorStarted(topicName1);
147+
try {
148+
admin2.topics().delete(topicName1);
149+
fail("Should fail to delete topic when replicating");
150+
} catch (PulsarAdminException.PreconditionFailedException ex) {
151+
assertTrue(ex.getMessage().contains("1 replicators"));
152+
}
153+
154+
final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
155+
admin1.topics().createPartitionedTopic(topicName2, 1);
156+
Producer<byte[]> producer2 = client1.newProducer().topic(topicName2).create();
157+
waitReplicatorStarted(TopicName.get(topicName2).getPartition(0).toString());
158+
try {
159+
admin2.topics().deletePartitionedTopic(topicName2);
160+
fail("Should fail to delete topic when replicating");
161+
} catch (PulsarAdminException.PreconditionFailedException ex) {
162+
assertTrue(ex.getMessage().contains("1 replicators"));
163+
}
164+
165+
producer1.close();
166+
producer2.close();
167+
cleanupTopics(() -> {
168+
admin1.topics().delete(topicName1);
169+
admin2.topics().deletePartitionedTopic(topicName2);
170+
});
171+
}
172+
142173
@Test(timeOut = 45 * 1000)
143174
public void testReplicatorProducerStatInTopic() throws Exception {
144175
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
7373
config.setDefaultNumPartitions(1);
7474
}
7575

76+
@Test(enabled = false)
77+
public void testDeleteTopicWhenReplicating() throws Exception {
78+
super.testDeleteTopicWhenReplicating();
79+
}
80+
7681
@Override
7782
@Test(enabled = false)
7883
public void testReplicatorProducerStatInTopic() throws Exception {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,12 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
8383
config.setTransactionCoordinatorEnabled(true);
8484
}
8585

86+
@Test(enabled = false)
87+
public void testDeleteTopicWhenReplicating() throws Exception {
88+
super.testDeleteTopicWhenReplicating();
89+
}
8690

87-
@Test(enabled = false)
91+
@Test(enabled = false)
8892
public void testReplicatorProducerStatInTopic() throws Exception {
8993
super.testReplicatorProducerStatInTopic();
9094
}

pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -962,7 +962,7 @@ public void testDeleteTopicAndSchemaForV1() throws Exception {
962962
} catch (Exception e) {
963963
assertThat(e.getMessage())
964964
.isNotNull()
965-
.startsWith("Topic has 2 connected producers/consumers");
965+
.startsWith("Topic has 2 clients");
966966
}
967967
assertEquals(this.getPulsar().getSchemaRegistryService()
968968
.trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(), 2);
@@ -972,7 +972,7 @@ public void testDeleteTopicAndSchemaForV1() throws Exception {
972972
} catch (Exception e) {
973973
assertThat(e.getMessage())
974974
.isNotNull()
975-
.startsWith("Topic has active producers/subscriptions");
975+
.startsWith("Topic has 1 client");
976976
}
977977
assertEquals(this.getPulsar().getSchemaRegistryService()
978978
.trimDeletedSchemaAndGetList(TopicName.get(topic2).getSchemaName()).get().size(), 1);
@@ -1055,15 +1055,15 @@ public void testDeleteTopicAndSchemaForV2() throws Exception {
10551055
admin.topics().delete(topicOne, false);
10561056
fail();
10571057
} catch (Exception e) {
1058-
assertTrue(e.getMessage().startsWith("Topic has 2 connected producers/consumers"));
1058+
assertTrue(e.getMessage().startsWith("Topic has 2 clients"));
10591059
}
10601060
assertEquals(this.getPulsar().getSchemaRegistryService()
10611061
.trimDeletedSchemaAndGetList(TopicName.get(topicOne).getSchemaName()).get().size(), 2);
10621062
try {
10631063
admin.topics().deletePartitionedTopic(topicTwo, false);
10641064
fail();
10651065
} catch (Exception e) {
1066-
assertTrue(e.getMessage().startsWith("Topic has active producers/subscriptions"));
1066+
assertTrue(e.getMessage().startsWith("Topic has 1 client"));
10671067
}
10681068
assertEquals(this.getPulsar().getSchemaRegistryService()
10691069
.trimDeletedSchemaAndGetList(TopicName.get(topicTwo).getSchemaName()).get().size(), 1);

0 commit comments

Comments
 (0)