Skip to content

Commit 9deee37

Browse files
authored
[fix][client] PIP-84: Skip processing a message in the message listener if the consumer epoch is no longer valid (#25007)
1 parent 43f75df commit 9deee37

File tree

1 file changed

+11
-0
lines changed

1 file changed

+11
-0
lines changed

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,6 +1210,17 @@ protected void callMessageListener(Message<T> msg) {
12101210
// after enabled message listener.
12111211
receivedConsumer.increaseAvailablePermits((MessageImpl<?>) (msg instanceof TopicMessageImpl
12121212
? ((TopicMessageImpl<T>) msg).getMessage() : msg));
1213+
1214+
MessageImpl<T> innerMessage = (MessageImpl<T>) (msg instanceof TopicMessageImpl
1215+
? ((TopicMessageImpl<T>) msg).getMessage() : msg);
1216+
if (!receivedConsumer.isValidConsumerEpoch(innerMessage)) {
1217+
if (log.isDebugEnabled()) {
1218+
log.debug("[{}][{}] Skipping processing message since the consumer epoch is not valid. {}", topic,
1219+
subscription, msg.getMessageId());
1220+
}
1221+
return;
1222+
}
1223+
12131224
MessageId id;
12141225
if (this instanceof ConsumerImpl) {
12151226
id = MessageIdAdvUtils.discardBatch(msg.getMessageId());

0 commit comments

Comments
 (0)