Skip to content

Commit 9a97c84

Browse files
[feat][broker] PIP-368: Support lookup based on the lookup properties (apache#23223)
PIP: apache#23075 ### Motivation This is the implementation for the PIP: apache#23075 Currently, the lookup process uses only the topic name as its parameter. However, to enhance this process, it's beneficial for clients to provide additional information. This could be done by introducing the `lookupProperties` field in the client configuration. Clients can then share these properties with the broker during lookup. On the broker side, the broker could also contain some properties that are used for the lookup. We can also support the lookupProperties for the broker. The broker can use these properties to make a better decision on which broker to return. Here is the rack-aware lookup scenario for using the client properties for the lookup: Assuming there are two brokers that broker-0 configures the lookup property "rack" with "A" and broker-1 configures the lookup property "rack" with "B". By using the lookup properties, clients can supply rack information during the lookup, enabling the broker to identify and connect them to the nearest broker within the same rack. If a client that configures the "rack" property with "A" connects to a lookup broker, the customized load manager can determine broker-0 as the owner broker since the broker and the client have the same rack property. ### Modifications - Add new configuration `lookupProperties` to the client. While looking up the broker, the client will send the properties to the broker through `CommandLookupTopic` request. - Add `properties` field to the `CommandLookupTopic`. - Add `lookupProperties` to the `LookupOptions`. The Load Manager implementation can access the `properties` through `LookupOptions` to make a better decision on which broker to return. - Introduce a new broker configuration `lookupPropertyPrefix`. Any broker configuration properties that start with the `lookupPropertyPrefix` will be included into the `BrokerLookupData` and be persisted in the metadata store. The broker can use these properties during the lookup. Co-authored-by: Yunze Xu <xyzinfernity@163.com>
1 parent d9bd6b0 commit 9a97c84

File tree

20 files changed

+220
-13
lines changed

20 files changed

+220
-13
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
import com.google.common.collect.Lists;
2222
import com.google.common.collect.Sets;
2323
import java.util.ArrayList;
24+
import java.util.HashMap;
2425
import java.util.HashSet;
2526
import java.util.LinkedHashSet;
2627
import java.util.List;
28+
import java.util.Map;
2729
import java.util.Objects;
2830
import java.util.Optional;
2931
import java.util.Properties;
@@ -2946,6 +2948,13 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
29462948
@com.fasterxml.jackson.annotation.JsonIgnore
29472949
private Properties properties = new Properties();
29482950

2951+
@FieldContext(
2952+
category = CATEGORY_SERVER,
2953+
doc = "The properties whose name starts with this prefix will be uploaded to the metadata store for "
2954+
+ " the topic lookup"
2955+
)
2956+
private String lookupPropertyPrefix = "lookup.";
2957+
29492958
@FieldContext(
29502959
dynamic = true,
29512960
category = CATEGORY_SERVER,
@@ -3743,4 +3752,14 @@ public int getTopicOrderedExecutorThreadNum() {
37433752
public boolean isSystemTopicAndTopicLevelPoliciesEnabled() {
37443753
return topicLevelPoliciesEnabled && systemTopicEnabled;
37453754
}
3755+
3756+
public Map<String, String> lookupProperties() {
3757+
final var map = new HashMap<String, String>();
3758+
properties.forEach((key, value) -> {
3759+
if (key instanceof String && value instanceof String && ((String) key).startsWith(lookupPropertyPrefix)) {
3760+
map.put((String) key, (String) value);
3761+
}
3762+
});
3763+
return map;
3764+
}
37463765
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ public BrokerRegistryImpl(PulsarService pulsar) {
9494
pulsar.getConfiguration().isEnableNonPersistentTopics(),
9595
conf.getLoadManagerClassName(),
9696
System.currentTimeMillis(),
97-
pulsar.getBrokerVersion());
97+
pulsar.getBrokerVersion(),
98+
pulsar.getConfig().lookupProperties());
9899
this.state = State.Init;
99100
}
100101

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public record BrokerLookupData (String webServiceUrl,
4141
boolean nonPersistentTopicsEnabled,
4242
String loadManagerClassName,
4343
long startTimestamp,
44-
String brokerVersion) implements ServiceLookupData {
44+
String brokerVersion,
45+
Map<String, String> properties) implements ServiceLookupData {
4546
@Override
4647
public String getWebServiceUrl() {
4748
return this.webServiceUrl();

pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.net.InetAddress;
2525
import java.net.URI;
2626
import java.net.URISyntaxException;
27+
import java.util.Collections;
28+
import java.util.Map;
2729
import java.util.Optional;
2830
import java.util.concurrent.CompletableFuture;
2931
import javax.ws.rs.Encoded;
@@ -180,7 +182,7 @@ protected String internalGetNamespaceBundle(TopicName topicName) {
180182
public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName,
181183
boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId) {
182184
return lookupTopicAsync(pulsarService, topicName, authoritative, clientAppId,
183-
authenticationData, requestId, null);
185+
authenticationData, requestId, null, Collections.emptyMap());
184186
}
185187

186188
/**
@@ -208,7 +210,8 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
208210
public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName,
209211
boolean authoritative, String clientAppId,
210212
AuthenticationDataSource authenticationData,
211-
long requestId, final String advertisedListenerName) {
213+
long requestId, final String advertisedListenerName,
214+
Map<String, String> properties) {
212215

213216
final CompletableFuture<ByteBuf> validationFuture = new CompletableFuture<>();
214217
final CompletableFuture<ByteBuf> lookupfuture = new CompletableFuture<>();
@@ -299,6 +302,7 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
299302
.authoritative(authoritative)
300303
.advertisedListenerName(advertisedListenerName)
301304
.loadTopicsInBundle(true)
305+
.properties(properties)
302306
.build();
303307
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options)
304308
.thenAccept(lookupResult -> {

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.namespace;
2020

21+
import java.util.Map;
2122
import lombok.Builder;
2223
import lombok.Data;
2324
import org.apache.commons.lang3.StringUtils;
@@ -46,6 +47,7 @@ public class LookupOptions {
4647
private final boolean requestHttps;
4748

4849
private final String advertisedListenerName;
50+
private final Map<String, String> properties;
4951

5052
public boolean hasAdvertisedListenerName() {
5153
return StringUtils.isNotBlank(advertisedListenerName);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.net.SocketAddress;
4848
import java.util.ArrayList;
4949
import java.util.Collections;
50+
import java.util.HashMap;
5051
import java.util.IdentityHashMap;
5152
import java.util.List;
5253
import java.util.Map;
@@ -544,9 +545,19 @@ protected void handleLookup(CommandLookupTopic lookup) {
544545
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply(
545546
isAuthorized -> {
546547
if (isAuthorized) {
548+
final Map<String, String> properties;
549+
if (lookup.getPropertiesCount() > 0) {
550+
properties = new HashMap<>();
551+
for (int i = 0; i < lookup.getPropertiesCount(); i++) {
552+
final var keyValue = lookup.getPropertyAt(i);
553+
properties.put(keyValue.getKey(), keyValue.getValue());
554+
}
555+
} else {
556+
properties = Collections.emptyMap();
557+
}
547558
lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative,
548559
getPrincipal(), getAuthenticationData(),
549-
requestId, advertisedListenerName).handle((lookupResponse, ex) -> {
560+
requestId, advertisedListenerName, properties).handle((lookupResponse, ex) -> {
550561
if (ex == null) {
551562
writeAndFlush(lookupResponse);
552563
} else {

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.net.URI;
2626
import java.net.URISyntaxException;
27+
import java.util.Collections;
2728
import java.util.HashMap;
2829
import java.util.Map;
2930
import java.util.Optional;
@@ -58,7 +59,8 @@ public void testConstructors() throws PulsarServerException, URISyntaxException
5859
BrokerLookupData lookupData = new BrokerLookupData(
5960
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
6061
pulsarServiceUrlTls, advertisedListeners, protocols, true, true,
61-
ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(),"3.0");
62+
ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(),"3.0",
63+
Collections.emptyMap());
6264
assertEquals(webServiceUrl, lookupData.webServiceUrl());
6365
assertEquals(webServiceUrlTls, lookupData.webServiceUrlTls());
6466
assertEquals(pulsarServiceUrl, lookupData.pulsarServiceUrl());

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.mockito.Mockito.mock;
2323

2424
import java.io.IOException;
25+
import java.util.Collections;
2526
import java.util.HashMap;
2627
import java.util.Map;
2728
import java.util.Optional;
@@ -136,6 +137,6 @@ public BrokerLookupData getLookupData(String version, String loadManagerClassNam
136137
return new BrokerLookupData(
137138
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
138139
pulsarServiceUrlTls, advertisedListeners, protocols, true, true,
139-
loadManagerClassName, -1, version);
140+
loadManagerClassName, -1, version, Collections.emptyMap());
140141
}
141142
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import static org.testng.Assert.assertEquals;
2929
import static org.testng.Assert.assertTrue;
3030

31+
import java.util.Collections;
3132
import java.util.HashMap;
3233
import java.util.Map;
3334
import java.util.Set;
@@ -218,7 +219,7 @@ public BrokerLookupData getLookupData(boolean persistentTopicsEnabled,
218219
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
219220
pulsarServiceUrlTls, advertisedListeners, protocols,
220221
persistentTopicsEnabled, nonPersistentTopicsEnabled,
221-
ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(), "3.0.0");
222+
ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(), "3.0.0", Collections.emptyMap());
222223
}
223224

224225
public LoadManagerContext getContext() {

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.pulsar.broker.lookup.LookupResult;
3434
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
3535
import org.testng.annotations.Test;
36+
37+
import java.util.Collections;
3638
import java.util.HashMap;
3739
import java.util.Map;
3840
import java.util.Optional;
@@ -106,6 +108,6 @@ public BrokerLookupData getLookupData(String broker, String loadManagerClassName
106108
return new BrokerLookupData(
107109
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
108110
pulsarServiceUrlTls, advertisedListeners, protocols, true, true,
109-
loadManagerClassName, startTimeStamp, "3.0.0");
111+
loadManagerClassName, startTimeStamp, "3.0.0", Collections.emptyMap());
110112
}
111113
}

0 commit comments

Comments
 (0)