Skip to content

Commit 61296d9

Browse files
author
Andras Beni
authored
[improve][meta]Allow version to start positive and grow by more than one (apache#19503)
1 parent 3e8c7de commit 61296d9

File tree

6 files changed

+41
-15
lines changed

6 files changed

+41
-15
lines changed

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,20 @@
1919
package org.apache.pulsar.metadata.api;
2020

2121
import lombok.Data;
22+
import lombok.RequiredArgsConstructor;
2223

2324
/**
2425
* Represent the information associated with a given value in the store.
2526
*/
2627
@Data
28+
@RequiredArgsConstructor
2729
public class Stat {
2830

31+
public Stat(String path, long version, long creationTimestamp, long modificationTimestamp, boolean ephemeral,
32+
boolean createdBySelf) {
33+
this(path, version, creationTimestamp, modificationTimestamp, ephemeral, createdBySelf, version == 0);
34+
}
35+
2936
/**
3037
* The path of the value.
3138
*/
@@ -55,4 +62,9 @@ public class Stat {
5562
* Whether the key-value pair had been created within the current "session".
5663
*/
5764
final boolean createdBySelf;
65+
66+
/**
67+
* Whether this is the first version of the key-value pair since it has been last created.
68+
*/
69+
final boolean firstVersion;
5870
}

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ public final CompletableFuture<Stat> putInternal(String path, byte[] data, Optio
466466
return storePut(path, data, optExpectedVersion,
467467
(options != null && !options.isEmpty()) ? EnumSet.copyOf(options) : EnumSet.noneOf(CreateOption.class))
468468
.thenApply(stat -> {
469-
NotificationType type = stat.getVersion() == 0 ? NotificationType.Created
469+
NotificationType type = stat.isFirstVersion() ? NotificationType.Created
470470
: NotificationType.Modified;
471471
if (type == NotificationType.Created) {
472472
existsCache.synchronous().invalidate(path);

pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,15 +290,18 @@ public void insertionWithInvalidation(String provider, Supplier<String> urlSuppl
290290
assertEquals(objCache.get(key1).join(), Optional.empty());
291291

292292
MyClass value1 = new MyClass("a", 1);
293-
store.put(key1, ObjectMapperFactory.getMapper().writer().writeValueAsBytes(value1), Optional.of(-1L)).join();
293+
Stat putResult = store.put(key1, ObjectMapperFactory.getMapper().writer().writeValueAsBytes(value1),
294+
Optional.of(-1L)).join();
295+
assertTrue(putResult.isFirstVersion());
294296

295297
Awaitility.await().untilAsserted(() -> {
296298
assertEquals(objCache.getIfCached(key1), Optional.of(value1));
297299
assertEquals(objCache.get(key1).join(), Optional.of(value1));
298300
});
299301

300302
MyClass value2 = new MyClass("a", 2);
301-
store.put(key1, ObjectMapperFactory.getMapper().writer().writeValueAsBytes(value2), Optional.of(0L)).join();
303+
store.put(key1, ObjectMapperFactory.getMapper().writer().writeValueAsBytes(value2),
304+
Optional.of(putResult.getVersion())).join();
302305

303306
Awaitility.await().untilAsserted(() -> {
304307
assertEquals(objCache.getIfCached(key1), Optional.of(value2));

pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,18 @@ public void testPutVersionErrors(String provider, Supplier<String> urlSupplier)
104104
CompletableFuture<Stat> f3 = store.put(key1 + "/c", new byte[0], Optional.of(-1L)); // Should succeed
105105
CompletableFuture<Void> f4 = store.delete(key1 + "/d", Optional.empty()); // Should fail
106106

107-
assertEquals(f1.join().getVersion(), 0L);
107+
assertTrue(f1.join().getVersion() >= 0L);
108+
assertTrue(f1.join().isFirstVersion());
108109

109110
try {
110111
f2.join();
111112
} catch (CompletionException ce) {
112113
assertEquals(ce.getCause().getClass(), BadVersionException.class);
113114
}
114115

115-
assertEquals(f3.join().getVersion(), 0L);
116+
assertTrue(f3.join().getVersion() >= 0L);
117+
assertTrue(f3.join().isFirstVersion());
118+
116119

117120
try {
118121
f4.join();

pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ public void sequentialKeys(String provider, Supplier<String> urlSupplier) throws
4545
Stat stat1 = store.put(basePath, "value-1".getBytes(), Optional.of(-1L), EnumSet.of(CreateOption.Sequential))
4646
.join();
4747
assertNotNull(stat1);
48-
assertEquals(stat1.getVersion(), 0L);
48+
assertTrue(stat1.getVersion() >= 0L);
49+
assertTrue(stat1.isFirstVersion());
4950
assertNotEquals(stat1.getPath(), basePath);
5051
assertEquals(store.get(stat1.getPath()).join().get().getValue(), "value-1".getBytes());
5152
String seq1 = stat1.getPath().replace(basePath, "");
@@ -54,7 +55,8 @@ public void sequentialKeys(String provider, Supplier<String> urlSupplier) throws
5455
Stat stat2 = store.put(basePath, "value-2".getBytes(), Optional.of(-1L), EnumSet.of(CreateOption.Sequential))
5556
.join();
5657
assertNotNull(stat2);
57-
assertEquals(stat2.getVersion(), 0L);
58+
assertTrue(stat2.getVersion() >= 0L);
59+
assertTrue(stat2.isFirstVersion());
5860
assertNotEquals(stat2.getPath(), basePath);
5961
assertNotEquals(stat2.getPath(), stat1.getPath());
6062
assertEquals(store.get(stat2.getPath()).join().get().getValue(), "value-2".getBytes());

pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,16 @@ public void insertionTestWithExpectedVersion(String provider, Supplier<String> u
130130
assertException(e, BadVersionException.class);
131131
}
132132

133-
store.put(key1, "value-1".getBytes(), Optional.of(-1L)).join();
133+
var putRes = store.put(key1, "value-1".getBytes(), Optional.of(-1L)).join();
134+
long putVersion = putRes.getVersion();
135+
assertTrue(putVersion >= 0);
136+
assertTrue(putRes.isFirstVersion());
134137

135138
assertTrue(store.exists(key1).join());
136139
Optional<GetResult> optRes = store.get(key1).join();
137140
assertTrue(optRes.isPresent());
138141
assertEquals(optRes.get().getValue(), "value-1".getBytes());
139-
assertEquals(optRes.get().getStat().getVersion(), 0);
142+
assertEquals(optRes.get().getStat().getVersion(), putVersion);
140143

141144
try {
142145
store.put(key1, "value-2".getBytes(), Optional.of(-1L)).join();
@@ -146,7 +149,7 @@ public void insertionTestWithExpectedVersion(String provider, Supplier<String> u
146149
}
147150

148151
try {
149-
store.put(key1, "value-2".getBytes(), Optional.of(1L)).join();
152+
store.put(key1, "value-2".getBytes(), Optional.of(putVersion + 1)).join();
150153
fail("Should have failed");
151154
} catch (CompletionException e) {
152155
assertException(e, BadVersionException.class);
@@ -156,15 +159,16 @@ public void insertionTestWithExpectedVersion(String provider, Supplier<String> u
156159
optRes = store.get(key1).join();
157160
assertTrue(optRes.isPresent());
158161
assertEquals(optRes.get().getValue(), "value-1".getBytes());
159-
assertEquals(optRes.get().getStat().getVersion(), 0);
162+
assertEquals(optRes.get().getStat().getVersion(), putVersion);
160163

161-
store.put(key1, "value-2".getBytes(), Optional.of(0L)).join();
164+
putRes = store.put(key1, "value-2".getBytes(), Optional.of(putVersion)).join();
165+
assertTrue(putRes.getVersion() > putVersion);
162166

163167
assertTrue(store.exists(key1).join());
164168
optRes = store.get(key1).join();
165169
assertTrue(optRes.isPresent());
166170
assertEquals(optRes.get().getValue(), "value-2".getBytes());
167-
assertEquals(optRes.get().getStat().getVersion(), 1);
171+
assertEquals(optRes.get().getStat().getVersion(), putRes.getVersion());
168172
}
169173

170174
@Test(dataProvider = "impl")
@@ -315,20 +319,22 @@ public void notificationListeners(String provider, Supplier<String> urlSupplier)
315319
Stat stat = store.put(key1, "value-1".getBytes(), Optional.empty()).join();
316320
assertTrue(store.get(key1).join().isPresent());
317321
assertEquals(store.getChildren(key1).join(), Collections.emptyList());
318-
assertEquals(stat.getVersion(), 0);
322+
assertTrue(stat.getVersion() >= 0);
323+
assertTrue(stat.isFirstVersion());
319324

320325
Notification n = notifications.poll(3, TimeUnit.SECONDS);
321326
assertNotNull(n);
322327
assertEquals(n.getType(), NotificationType.Created);
323328
assertEquals(n.getPath(), key1);
329+
var firstVersion = stat.getVersion();
324330

325331
// Trigger modified notification
326332
stat = store.put(key1, "value-2".getBytes(), Optional.empty()).join();
327333
n = notifications.poll(3, TimeUnit.SECONDS);
328334
assertNotNull(n);
329335
assertEquals(n.getType(), NotificationType.Modified);
330336
assertEquals(n.getPath(), key1);
331-
assertEquals(stat.getVersion(), 1);
337+
assertTrue(stat.getVersion() > firstVersion);
332338

333339
// Trigger modified notification on the parent
334340
String key1Child = key1 + "/xx";

0 commit comments

Comments
 (0)