Pulsar: allow to configure 'ranges' for KEY_SHARED subscription type

This commit is contained in:
Enrico Olivelli
2022-03-02 15:15:09 +01:00
parent f501f35122
commit 15a7da061e
2 changed files with 40 additions and 2 deletions

View File

@@ -396,7 +396,8 @@ public class PulsarSpace {
public Consumer<?> getConsumer(String cycleTopicName,
String cycleSubscriptionName,
String cycleSubscriptionType,
String cycleConsumerName) {
String cycleConsumerName,
String cycleKeySharedSubscriptionRanges) {
String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName);
SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType);
String consumerName = getEffectiveConsumerName(cycleConsumerName);
@@ -432,6 +433,16 @@ public class PulsarSpace {
subscriptionName(subscriptionName).
subscriptionType(subscriptionType);
if (subscriptionType == SubscriptionType.Key_Shared) {
KeySharedPolicy keySharedPolicy = KeySharedPolicy.autoSplitHashRange();
if (cycleKeySharedSubscriptionRanges != null && !cycleKeySharedSubscriptionRanges.isEmpty()) {
Range[] ranges = parseRanges(cycleKeySharedSubscriptionRanges);
logger.info("Configuring KeySharedPolicy#stickyHashRange with ranges {}", ranges);
keySharedPolicy = KeySharedPolicy.stickyHashRange().ranges(ranges);
}
consumerBuilder.keySharedPolicy(keySharedPolicy);
}
if (!StringUtils.isBlank(consumerName)) {
consumerBuilder = consumerBuilder.consumerName(consumerName);
}
@@ -471,6 +482,30 @@ public class PulsarSpace {
return consumer;
}
private static Range[] parseRanges(String ranges) {
if (ranges == null || ranges.isEmpty()) {
return new Range[0];
}
String[] split = ranges.split(",");
Range[] result = new Range[split.length];
for (int i = 0; i < split.length; i++) {
String range = split[i];
int pos = range.indexOf("..");
if (pos <= 0) {
throw new IllegalArgumentException("Invalid range '" + range + "'");
}
try {
int start = Integer.parseInt(range.substring(0, pos));
int end = Integer.parseInt(range.substring(pos + 2));
result[i] = Range.of(start, end);
} catch (NumberFormatException err) {
throw new IllegalArgumentException("Invalid range '" + range + "'");
}
}
return result;
}
//
//////////////////////////////////////
// Consumer Processing <-- end

View File

@@ -322,6 +322,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<String> consumer_name_func = lookupParameterFunc("consumer_name");
LongFunction<String> ranges_func = lookupParameterFunc("ranges", false, "");
LongFunction<Supplier<Transaction>> transactionSupplierFunc =
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
@@ -330,7 +332,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
topic_uri_func.apply(l),
subscription_name_func.apply(l),
subscription_type_func.apply(l),
consumer_name_func.apply(l)
consumer_name_func.apply(l),
ranges_func.apply(l)
);
return new PulsarConsumerMapper(