Skip to content

Commit a0b5b70

Browse files
GH-4170 : Add KafkaListener Validation (Allow @Topic or @TopicPartition) (#4172)
Fixes #4170 **Auto-cherry-pick to `3.3.x`** * add topic validation of kafka listener * refactor: allow topic pattern for validtion Signed-off-by: moonyoungCHAE <xpf_fl@naver.com>
1 parent 9eb09a9 commit a0b5b70

File tree

1 file changed

+17
-0
lines changed

1 file changed

+17
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -663,9 +663,12 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> en
663663
endpoint.setBean(bean);
664664
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
665665
endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
666+
667+
assertTopic(kafkaListener);
666668
endpoint.setTopicPartitions(tps);
667669
endpoint.setTopics(topics);
668670
endpoint.setTopicPattern(resolvePattern(kafkaListener));
671+
669672
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
670673
endpoint.setListenerInfo(resolveExpressionAsBytes(kafkaListener.info(), "info"));
671674
String group = kafkaListener.containerGroup();
@@ -856,6 +859,20 @@ private String getEndpointGroupId(KafkaListener kafkaListener, @Nullable String
856859
return groupId;
857860
}
858861

862+
private void assertTopic(KafkaListener kafkaListener) {
863+
int count = 0;
864+
if (!kafkaListener.topicPattern().isEmpty()) {
865+
count++;
866+
}
867+
if (kafkaListener.topics().length > 0) {
868+
count++;
869+
}
870+
if (kafkaListener.topicPartitions().length > 0) {
871+
count++;
872+
}
873+
Assert.state(count == 1, "Only one of @Topic or @TopicPartition or @TopicPattern must be provided");
874+
}
875+
859876
private TopicPartitionOffset[] resolveTopicPartitions(KafkaListener kafkaListener) {
860877
TopicPartition[] topicPartitions = kafkaListener.topicPartitions();
861878
List<TopicPartitionOffset> result = new ArrayList<>();

0 commit comments

Comments
 (0)