Skip to content

Commit ec609af

Browse files
authored
[fix][broker] Fix issue with schemaValidationEnforced in geo-replication (#25012)
1 parent c8d6208 commit ec609af

File tree

3 files changed

+110
-1
lines changed

3 files changed

+110
-1
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import java.util.concurrent.TimeUnit;
23+
import lombok.Data;
24+
import lombok.extern.slf4j.Slf4j;
25+
import org.apache.pulsar.broker.BrokerTestUtil;
26+
import org.apache.pulsar.broker.ServiceConfiguration;
27+
import org.apache.pulsar.client.api.Consumer;
28+
import org.apache.pulsar.client.api.Message;
29+
import org.apache.pulsar.client.api.Producer;
30+
import org.apache.pulsar.client.api.Schema;
31+
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
32+
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
33+
import org.testng.annotations.AfterClass;
34+
import org.testng.annotations.BeforeClass;
35+
import org.testng.annotations.Test;
36+
37+
@Slf4j
38+
@Test(groups = "broker-replication")
39+
public class OneWayReplicatorSchemaValidationEnforcedTest extends OneWayReplicatorTestBase {
40+
41+
@Override
42+
@BeforeClass(alwaysRun = true, timeOut = 300000)
43+
public void setup() throws Exception {
44+
super.setup();
45+
}
46+
47+
@Override
48+
@AfterClass(alwaysRun = true, timeOut = 300000)
49+
public void cleanup() throws Exception {
50+
super.cleanup();
51+
}
52+
53+
@Data
54+
private static class MyClass {
55+
int field1;
56+
String field2;
57+
Long field3;
58+
}
59+
60+
@Override
61+
protected void setConfigDefaults(ServiceConfiguration config, String clusterName,
62+
LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
63+
super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk);
64+
config.setSchemaValidationEnforced(true);
65+
// disable topic auto creation so that it's possible to reproduce the scenario consistently
66+
config.setAllowAutoTopicCreation(false);
67+
}
68+
69+
@Test(timeOut = 30000)
70+
public void testReplicationWithAvroSchemaWithSchemaValidationEnforced() throws Exception {
71+
Schema<MyClass> myClassSchema = Schema.AVRO(MyClass.class);
72+
final String topicName =
73+
BrokerTestUtil.newUniqueName("persistent://" + sourceClusterAlwaysSchemaCompatibleNamespace + "/tp_");
74+
// create the topic and schema in the local cluster (r1)
75+
admin1.topics().createNonPartitionedTopic(topicName);
76+
admin1.schemas().createSchema(topicName, myClassSchema.getSchemaInfo());
77+
// create the topic and schema in the remote cluster (r2)
78+
admin2.topics().createNonPartitionedTopic(topicName);
79+
admin2.schemas().createSchema(topicName, myClassSchema.getSchemaInfo());
80+
81+
// consume from the remote cluster (r2)
82+
Consumer<MyClass> consumer2 = client2.newConsumer(myClassSchema)
83+
.topic(topicName).subscriptionName("sub").subscribe();
84+
85+
// produce to local cluster (r1)
86+
Producer<MyClass> producer1 = client1.newProducer(myClassSchema).topic(topicName).create();
87+
MyClass sentBody = new MyClass();
88+
sentBody.setField1(1);
89+
sentBody.setField2("test");
90+
sentBody.setField3(123456789L);
91+
producer1.send(sentBody);
92+
93+
// verify that the message was received from the remote cluster (r2)
94+
Message<MyClass> received = consumer2.receive(10, TimeUnit.SECONDS);
95+
assertThat(received).isNotNull();
96+
assertThat(received.getValue()).isNotNull().satisfies(receivedBody -> {
97+
assertThat(receivedBody.getField1()).isEqualTo(1);
98+
assertThat(receivedBody.getField2()).isEqualTo("test");
99+
assertThat(receivedBody.getField3()).isEqualTo(123456789L);
100+
});
101+
}
102+
103+
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurat
424424

425425
if (schema instanceof AutoProduceBytesSchema) {
426426
AutoProduceBytesSchema autoProduceBytesSchema = (AutoProduceBytesSchema) schema;
427-
if (autoProduceBytesSchema.schemaInitialized()) {
427+
if (autoProduceBytesSchema.hasUserProvidedSchema()) {
428428
return createProducerAsync(topic, conf, schema, interceptors);
429429
}
430430
return lookup.getSchema(TopicName.get(conf.getTopicName()))

pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@ public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
3535
@Setter
3636
private boolean requireSchemaValidation = true;
3737
private Schema<T> schema;
38+
private boolean userProvidedSchema;
3839

3940
public AutoProduceBytesSchema() {
4041
}
4142

4243
public AutoProduceBytesSchema(Schema<T> schema) {
4344
this.schema = schema;
45+
this.userProvidedSchema = true;
4446
SchemaInfo schemaInfo = schema.getSchemaInfo();
4547
this.requireSchemaValidation = schemaInfo != null
4648
&& schemaInfo.getType() != SchemaType.BYTES
@@ -62,6 +64,10 @@ public boolean schemaInitialized() {
6264
return schema != null;
6365
}
6466

67+
public boolean hasUserProvidedSchema() {
68+
return userProvidedSchema;
69+
}
70+
6571
@Override
6672
public void validate(byte[] message) {
6773
ensureSchemaInitialized();

0 commit comments

Comments
 (0)