File tree Expand file tree Collapse file tree 1 file changed +11
-0
lines changed
pulsar-client/src/main/java/org/apache/pulsar/client/impl Expand file tree Collapse file tree 1 file changed +11
-0
lines changed Original file line number Diff line number Diff line change @@ -1210,6 +1210,17 @@ protected void callMessageListener(Message<T> msg) {
12101210 // after enabled message listener.
12111211 receivedConsumer .increaseAvailablePermits ((MessageImpl <?>) (msg instanceof TopicMessageImpl
12121212 ? ((TopicMessageImpl <T >) msg ).getMessage () : msg ));
1213+
1214+ MessageImpl <T > innerMessage = (MessageImpl <T >) (msg instanceof TopicMessageImpl
1215+ ? ((TopicMessageImpl <T >) msg ).getMessage () : msg );
1216+ if (!receivedConsumer .isValidConsumerEpoch (innerMessage )) {
1217+ if (log .isDebugEnabled ()) {
1218+ log .debug ("[{}][{}] Skipping processing message since the consumer epoch is not valid. {}" , topic ,
1219+ subscription , msg .getMessageId ());
1220+ }
1221+ return ;
1222+ }
1223+
12131224 MessageId id ;
12141225 if (this instanceof ConsumerImpl ) {
12151226 id = MessageIdAdvUtils .discardBatch (msg .getMessageId ());
You can’t perform that action at this time.
0 commit comments