Skip to content

Commit 7cd28b9

Browse files
authored
Support function with format: Function<I, CompletableFuture<O>> (apache#6684)
Fixes apache#6519 ### Motivation Currently, Pulsar Functions not support Async mode, e.g. user passed in a Function in format : ``` Function<I, CompletableFuture<O>> ``` This kind of function is useful if the function might use RPCs to call external systems. e.g. ```java public class AsyncFunction implements Function<String, CompletableFuture<O>> { CompletableFuture<O> apply (String input) { CompletableFuture future = new CompletableFuture(); ...function compute... future.whenComplete(() -> { ... call external system ... }); return future; } ``` ### Modifications - add support for Async Functions support. ### Verifying this change current ut passed. * support func: Function<I, CompletableFuture<O>> * add 2 examples * add limit to the max outstanding items
1 parent b9e9609 commit 7cd28b9

File tree

11 files changed

+289
-31
lines changed

11 files changed

+289
-31
lines changed

pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,7 @@ public enum Runtime {
113113
// to change behavior at runtime. Currently, this primarily used by the KubernetesManifestCustomizer
114114
// interface
115115
private String customRuntimeOptions;
116+
// Max pending async requests per instance to avoid large number of concurrent requests.
117+
// Only used in AsyncFunction. Default: 1000.
118+
private Integer maxPendingAsyncRequests = 1000;
116119
}

pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ public class InstanceConfig {
3636
private Function.FunctionAuthenticationSpec functionAuthenticationSpec;
3737
private int port;
3838
private String clusterName;
39+
// Max pending async requests per instance to avoid large number of concurrent requests.
40+
// Only used in AsyncFunction. Default: 1000
41+
private int maxPendingAsyncRequests = 1000;
3942

4043
/**
4144
* Get the string representation of {@link #getInstanceId()}.

pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818
*/
1919
package org.apache.pulsar.functions.instance;
2020

21+
import java.util.Iterator;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.Executor;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.LinkedBlockingQueue;
2126
import lombok.AccessLevel;
2227
import lombok.Getter;
2328
import lombok.extern.slf4j.Slf4j;
@@ -40,9 +45,18 @@ public class JavaInstance implements AutoCloseable {
4045
private Function function;
4146
private java.util.function.Function javaUtilFunction;
4247

43-
public JavaInstance(ContextImpl contextImpl, Object userClassObject) {
48+
// for Async function max out standing items
49+
private final InstanceConfig instanceConfig;
50+
private final Executor executor;
51+
@Getter
52+
private final LinkedBlockingQueue<CompletableFuture<Void>> pendingAsyncRequests;
53+
54+
public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceConfig instanceConfig) {
4455

4556
this.context = contextImpl;
57+
this.instanceConfig = instanceConfig;
58+
this.executor = Executors.newSingleThreadExecutor();
59+
this.pendingAsyncRequests = new LinkedBlockingQueue<>(this.instanceConfig.getMaxPendingAsyncRequests());
4660

4761
// create the functions
4862
if (userClassObject instanceof Function) {
@@ -52,23 +66,63 @@ public JavaInstance(ContextImpl contextImpl, Object userClassObject) {
5266
}
5367
}
5468

55-
public JavaExecutionResult handleMessage(Record<?> record, Object input) {
69+
public CompletableFuture<JavaExecutionResult> handleMessage(Record<?> record, Object input) {
5670
if (context != null) {
5771
context.setCurrentMessageContext(record);
5872
}
73+
74+
final CompletableFuture<JavaExecutionResult> future = new CompletableFuture<>();
5975
JavaExecutionResult executionResult = new JavaExecutionResult();
76+
77+
final Object output;
78+
6079
try {
61-
Object output;
6280
if (function != null) {
6381
output = function.process(input, context);
6482
} else {
6583
output = javaUtilFunction.apply(input);
6684
}
67-
executionResult.setResult(output);
6885
} catch (Exception ex) {
6986
executionResult.setUserException(ex);
87+
future.complete(executionResult);
88+
return future;
89+
}
90+
91+
if (output instanceof CompletableFuture) {
92+
// Function is in format: Function<I, CompletableFuture<O>>
93+
try {
94+
pendingAsyncRequests.put((CompletableFuture) output);
95+
} catch (InterruptedException ie) {
96+
log.warn("Exception while put Async requests", ie);
97+
executionResult.setUserException(ie);
98+
future.complete(executionResult);
99+
return future;
100+
}
101+
102+
((CompletableFuture) output).whenCompleteAsync((obj, throwable) -> {
103+
if (log.isDebugEnabled()) {
104+
log.debug("Got result async: object: {}, throwable: {}", obj, throwable);
105+
}
106+
107+
if (throwable != null) {
108+
executionResult.setUserException(new Exception((Throwable)throwable));
109+
pendingAsyncRequests.remove(output);
110+
future.complete(executionResult);
111+
return;
112+
}
113+
executionResult.setResult(obj);
114+
pendingAsyncRequests.remove(output);
115+
future.complete(executionResult);
116+
}, executor);
117+
} else {
118+
if (log.isDebugEnabled()) {
119+
log.debug("Got result: object: {}", output);
120+
}
121+
executionResult.setResult(output);
122+
future.complete(executionResult);
70123
}
71-
return executionResult;
124+
125+
return future;
72126
}
73127

74128
@Override

pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.gson.reflect.TypeToken;
2525
import io.netty.buffer.ByteBuf;
2626
import io.prometheus.client.CollectorRegistry;
27+
import java.util.concurrent.CompletableFuture;
2728
import lombok.AccessLevel;
2829
import lombok.Getter;
2930
import lombok.extern.slf4j.Slf4j;
@@ -214,7 +215,7 @@ JavaInstance setupJavaInstance() throws Exception {
214215
// start any log topic handler
215216
setupLogHandler();
216217

217-
return new JavaInstance(contextImpl, object);
218+
return new JavaInstance(contextImpl, object, instanceConfig);
218219
}
219220

220221
ContextImpl setupContext() {
@@ -254,7 +255,7 @@ public void run() {
254255
}
255256

256257
addLogTopicHandler();
257-
JavaExecutionResult result;
258+
CompletableFuture<JavaExecutionResult> result;
258259

259260
// set last invocation time
260261
stats.setLastInvocation(System.currentTimeMillis());
@@ -272,10 +273,6 @@ public void run() {
272273

273274
removeLogTopicHandler();
274275

275-
if (log.isDebugEnabled()) {
276-
log.debug("Got result: {}", result.getResult());
277-
}
278-
279276
try {
280277
processResult(currentRecord, result);
281278
} catch (Exception e) {
@@ -415,23 +412,27 @@ private void setupStateTable() throws Exception {
415412
}
416413

417414
private void processResult(Record srcRecord,
418-
JavaExecutionResult result) throws Exception {
419-
if (result.getUserException() != null) {
420-
log.info("Encountered user exception when processing message {}", srcRecord, result.getUserException());
421-
stats.incrUserExceptions(result.getUserException());
422-
srcRecord.fail();
423-
} else {
424-
if (result.getResult() != null) {
425-
sendOutputMessage(srcRecord, result.getResult());
415+
CompletableFuture<JavaExecutionResult> result) throws Exception {
416+
result.whenComplete((result1, throwable) -> {
417+
if (throwable != null || result1.getUserException() != null) {
418+
Throwable t = throwable != null ? throwable : result1.getUserException();
419+
log.warn("Encountered exception when processing message {}",
420+
srcRecord, t);
421+
stats.incrUserExceptions(t);
422+
srcRecord.fail();
426423
} else {
427-
if (instanceConfig.getFunctionDetails().getAutoAck()) {
428-
// the function doesn't produce any result or the user doesn't want the result.
429-
srcRecord.ack();
424+
if (result1.getResult() != null) {
425+
sendOutputMessage(srcRecord, result1.getResult());
426+
} else {
427+
if (instanceConfig.getFunctionDetails().getAutoAck()) {
428+
// the function doesn't produce any result or the user doesn't want the result.
429+
srcRecord.ack();
430+
}
430431
}
432+
// increment total successfully processed
433+
stats.incrTotalProcessedSuccessfully();
431434
}
432-
// increment total successfully processed
433-
stats.incrTotalProcessedSuccessfully();
434-
}
435+
});
435436
}
436437

437438
private void sendOutputMessage(Record srcRecord, Object output) {

pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,109 @@
2222
import static org.testng.Assert.assertEquals;
2323
import static org.testng.Assert.assertNotNull;
2424

25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.Executors;
27+
import lombok.extern.slf4j.Slf4j;
2528
import org.apache.pulsar.functions.api.Function;
2629
import org.apache.pulsar.functions.api.Record;
2730
import org.testng.annotations.Test;
2831

32+
@Slf4j
2933
public class JavaInstanceTest {
3034

3135
/**
3236
* Verify that be able to run lambda functions.
3337
* @throws Exception
3438
*/
3539
@Test
36-
public void testLambda() {
40+
public void testLambda() throws Exception {
3741
JavaInstance instance = new JavaInstance(
38-
mock(ContextImpl.class),
39-
(Function<String, String>) (input, context) -> input + "-lambda");
42+
mock(ContextImpl.class),
43+
(Function<String, String>) (input, context) -> input + "-lambda",
44+
new InstanceConfig());
4045
String testString = "ABC123";
41-
JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString);
42-
assertNotNull(result.getResult());
43-
assertEquals(new String(testString + "-lambda"), result.getResult());
46+
CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString);
47+
assertNotNull(result.get().getResult());
48+
assertEquals(new String(testString + "-lambda"), result.get().getResult());
49+
instance.close();
50+
}
51+
52+
@Test
53+
public void testAsyncFunction() throws Exception {
54+
InstanceConfig instanceConfig = new InstanceConfig();
55+
56+
Function<String, CompletableFuture<String>> function = (input, context) -> {
57+
log.info("input string: {}", input);
58+
CompletableFuture<String> result = new CompletableFuture<>();
59+
Executors.newCachedThreadPool().submit(() -> {
60+
try {
61+
Thread.sleep(500);
62+
result.complete(String.format("%s-lambda", input));
63+
} catch (Exception e) {
64+
result.completeExceptionally(e);
65+
}
66+
});
67+
68+
return result;
69+
};
70+
71+
JavaInstance instance = new JavaInstance(
72+
mock(ContextImpl.class),
73+
function,
74+
instanceConfig);
75+
String testString = "ABC123";
76+
CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString);
77+
assertNotNull(result.get().getResult());
78+
assertEquals(new String(testString + "-lambda"), result.get().getResult());
79+
instance.close();
80+
}
81+
82+
@Test
83+
public void testAsyncFunctionMaxPending() throws Exception {
84+
InstanceConfig instanceConfig = new InstanceConfig();
85+
int pendingQueueSize = 2;
86+
instanceConfig.setMaxPendingAsyncRequests(pendingQueueSize);
87+
88+
Function<String, CompletableFuture<String>> function = (input, context) -> {
89+
log.info("input string: {}", input);
90+
CompletableFuture<String> result = new CompletableFuture<>();
91+
Executors.newCachedThreadPool().submit(() -> {
92+
try {
93+
Thread.sleep(500);
94+
result.complete(String.format("%s-lambda", input));
95+
} catch (Exception e) {
96+
result.completeExceptionally(e);
97+
}
98+
});
99+
100+
return result;
101+
};
102+
103+
JavaInstance instance = new JavaInstance(
104+
mock(ContextImpl.class),
105+
function,
106+
instanceConfig);
107+
String testString = "ABC123";
108+
109+
long startTime = System.currentTimeMillis();
110+
assertEquals(pendingQueueSize, instance.getPendingAsyncRequests().remainingCapacity());
111+
CompletableFuture<JavaExecutionResult> result1 = instance.handleMessage(mock(Record.class), testString);
112+
assertEquals(pendingQueueSize - 1, instance.getPendingAsyncRequests().remainingCapacity());
113+
CompletableFuture<JavaExecutionResult> result2 = instance.handleMessage(mock(Record.class), testString);
114+
assertEquals(pendingQueueSize - 2, instance.getPendingAsyncRequests().remainingCapacity());
115+
CompletableFuture<JavaExecutionResult> result3 = instance.handleMessage(mock(Record.class), testString);
116+
// no space left
117+
assertEquals(0, instance.getPendingAsyncRequests().remainingCapacity());
118+
119+
instance.getPendingAsyncRequests().remainingCapacity();
120+
assertNotNull(result1.get().getResult());
121+
assertNotNull(result2.get().getResult());
122+
assertNotNull(result3.get().getResult());
123+
124+
assertEquals(new String(testString + "-lambda"), result1.get().getResult());
125+
long endTime = System.currentTimeMillis();
126+
127+
log.info("start:{} end:{} during:{}", startTime, endTime, endTime - startTime);
44128
instance.close();
45129
}
46130
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.functions.api.examples;
20+
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.Executors;
23+
import java.util.stream.Collectors;
24+
import org.apache.pulsar.functions.api.Context;
25+
import org.apache.pulsar.functions.api.Function;
26+
import org.slf4j.Logger;
27+
28+
public class AsyncContextFunction implements Function<String, CompletableFuture<Void>> {
29+
@Override
30+
public CompletableFuture<Void> process(String input, Context context) {
31+
Logger LOG = context.getLogger();
32+
CompletableFuture<Void> future = new CompletableFuture();
33+
34+
// this method only delay a function execute.
35+
Executors.newCachedThreadPool().submit(() -> {
36+
try {
37+
Thread.sleep(500);
38+
} catch (Exception e) {
39+
LOG.error("Exception when Thread.sleep", e);
40+
future.completeExceptionally(e);
41+
}
42+
43+
String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(", "));
44+
String funcName = context.getFunctionName();
45+
46+
String logMessage = String
47+
.format("A message with value of \"%s\" has arrived on one of the following topics: %s\n",
48+
input, inputTopics);
49+
LOG.info(logMessage);
50+
51+
String metricName = String.format("function-%s-messages-received", funcName);
52+
context.recordMetric(metricName, 1);
53+
54+
future.complete(null);
55+
});
56+
57+
return future;
58+
}
59+
}

0 commit comments

Comments
 (0)