Skip to content

Commit 18c26e0

Browse files
authored
Do not create any producer if the output type of a function is void (apache#2756)
* Do not create any producer if the output type of a function is void * Do not write if the record is null * Revert the check for null
1 parent 362f795 commit 18c26e0

File tree

2 files changed

+8
-10
lines changed

2 files changed

+8
-10
lines changed

pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,10 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
218218
log.info("Opening pulsar sink with config: {}", pulsarSinkConfig);
219219

220220
Schema<T> schema = initializeSchema();
221+
if (schema == null) {
222+
log.info("Since output type is null, not creating any real sink");
223+
return;
224+
}
221225

222226
FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees();
223227
switch (processingGuarantees) {
@@ -283,7 +287,7 @@ Schema<T> initializeSchema() throws ClassNotFoundException {
283287

284288
if (Void.class.equals(typeArg)) {
285289
// return type is 'void', so there's no schema to check
286-
return (Schema<T>) Schema.BYTES;
290+
return null;
287291
}
288292

289293
if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {

pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,7 @@
4242
import lombok.Setter;
4343
import lombok.extern.slf4j.Slf4j;
4444

45-
import org.apache.pulsar.client.api.Consumer;
46-
import org.apache.pulsar.client.api.ConsumerBuilder;
47-
import org.apache.pulsar.client.api.MessageId;
48-
import org.apache.pulsar.client.api.Producer;
49-
import org.apache.pulsar.client.api.ProducerBuilder;
50-
import org.apache.pulsar.client.api.PulsarClient;
51-
import org.apache.pulsar.client.api.PulsarClientException;
52-
import org.apache.pulsar.client.api.TypedMessageBuilder;
45+
import org.apache.pulsar.client.api.*;
5346
import org.apache.pulsar.client.impl.PulsarClientImpl;
5447
import org.apache.pulsar.functions.api.Record;
5548
import org.apache.pulsar.functions.api.SerDe;
@@ -169,7 +162,8 @@ public void testVoidOutputClasses() throws Exception {
169162
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");
170163

171164
try {
172-
pulsarSink.initializeSchema();
165+
Schema schema = pulsarSink.initializeSchema();
166+
assertEquals(schema, (Schema)null);
173167
} catch (Exception ex) {
174168
ex.printStackTrace();
175169
assertEquals(ex, null);

0 commit comments

Comments
 (0)