Add support for JMS priority

This commit is contained in:
yabinmeng 2024-05-01 19:04:16 -05:00
parent 4500d05a83
commit c983b1ea34
8 changed files with 90 additions and 58 deletions

View File

@ -37,7 +37,7 @@
</description>
<properties>
<s4j.version>4.0.1</s4j.version>
<s4j.version>4.1.2-alpha</s4j.version>
</properties>
<dependencies>

View File

@ -99,8 +99,9 @@ public class S4JSpace implements AutoCloseable {
// Keep track the transaction count per thread
private final ThreadLocal<Integer> txnBatchTrackingCnt = ThreadLocal.withInitial(() -> 0);
// Represents the JMS connection
private PulsarConnectionFactory s4jConnFactory;
// Instead of using one "physical" connection per NB process,
// allows creating multiple connections to the Pulsar broker via the "num_conn" parameter
private final ConcurrentHashMap<String, PulsarConnectionFactory> connFactories = new ConcurrentHashMap<>();
private long totalCycleNum;
@ -125,6 +126,7 @@ public class S4JSpace implements AutoCloseable {
this.sessionMode = S4JAdapterUtil.getSessionModeFromStr(
cfg.getOptional("session_mode").orElse(""));
this.s4JClientConf = new S4JClientConf(webSvcUrl, pulsarSvcUrl, s4jClientConfFileName);
logger.info("{}", s4JClientConf.toString());
this.setS4JActivityStartTimeMills(System.currentTimeMillis());
@ -217,45 +219,41 @@ public class S4JSpace implements AutoCloseable {
public long incTotalNullMsgRecvdCnt() { return nullMsgRecvCnt.incrementAndGet(); }
public PulsarConnectionFactory getS4jConnFactory() { return s4jConnFactory; }
public long getTotalCycleNum() { return totalCycleNum; }
public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; }
public void initializeSpace(S4JClientConf s4JClientConnInfo) {
if (s4jConnFactory == null) {
Map<String, Object> cfgMap;
try {
cfgMap = s4JClientConnInfo.getS4jConfObjMap();
s4jConnFactory = new PulsarConnectionFactory(cfgMap);
Map<String, Object> cfgMap;
try {
cfgMap = s4JClientConnInfo.getS4jConfObjMap();
for (int i=0; i<getMaxNumConn(); i++) {
// Establish a JMS connection
String connLvlJmsConnContextIdStr =getConnLvlJmsContextIdentifier(i);
for (int i=0; i<getMaxNumConn(); i++) {
String connLvlJmsConnContextIdStr =getConnLvlJmsContextIdentifier(i);
JMSContext jmsConnContext = getOrCreateConnLvlJMSContext(s4jConnFactory, s4JClientConnInfo, sessionMode);
jmsConnContext.setClientID(connLvlJmsConnContextIdStr);
jmsConnContext.setExceptionListener(e -> {
if (logger.isDebugEnabled()) {
logger.error("onException::Unexpected JMS error happened:" + e);
}
});
// Establish a JMS connection
PulsarConnectionFactory s4jConnFactory = connFactories.computeIfAbsent(
connLvlJmsConnContextIdStr,
__ -> new PulsarConnectionFactory(cfgMap));
connLvlJmsContexts.put(connLvlJmsConnContextIdStr, jmsConnContext);
JMSContext jmsConnContext = getOrCreateConnLvlJMSContext(s4jConnFactory, s4JClientConnInfo, sessionMode);
jmsConnContext.setClientID(connLvlJmsConnContextIdStr);
jmsConnContext.setExceptionListener(e -> {
if (logger.isDebugEnabled()) {
logger.debug("[Connection level JMSContext] {} -- {}",
Thread.currentThread().getName(),
jmsConnContext );
logger.error("onException::Unexpected JMS error happened:" + e);
}
});
connLvlJmsContexts.put(connLvlJmsConnContextIdStr, jmsConnContext);
if (logger.isDebugEnabled()) {
logger.debug("[Connection level JMSContext] {} -- {}",
Thread.currentThread().getName(),
jmsConnContext );
}
}
catch (JMSRuntimeException e) {
logger.error("Unable to initialize JMS connection factory with the following configuration parameters: {}", s4JClientConnInfo.toString());
throw new S4JAdapterUnexpectedException("Unable to initialize JMS connection factory with the following error message: " + e.getCause());
}
catch (Exception e) {
e.printStackTrace();
}
}
catch (JMSRuntimeException e) {
logger.error("Unable to initialize JMS connection factory with the following configuration parameters: {}", s4JClientConnInfo.toString());
throw new S4JAdapterUnexpectedException("Unable to initialize JMS connection factory with the following error message: " + e.getCause());
}
}
@ -284,7 +282,9 @@ public class S4JSpace implements AutoCloseable {
if (jmsContext != null) jmsContext.close();
}
s4jConnFactory.close();
for (PulsarConnectionFactory s4jConnFactory : connFactories.values()) {
if (s4jConnFactory != null) s4jConnFactory.close();
}
}
catch (Exception ex) {
String exp = "Unexpected error when shutting down the S4J adaptor space";

View File

@ -40,11 +40,13 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
private final static Logger logger = LogManager.getLogger("MessageProducerOpDispenser");
public static final String MSG_HEADER_OP_PARAM = "msg_header";
public static final String MSG_PRIORITY_OP_PARAM = "msg_priority";
public static final String MSG_PROP_OP_PARAM = "msg_property";
public static final String MSG_BODY_OP_PARAM = "msg_body";
public static final String MSG_TYPE_OP_PARAM = "msg_type";
private final LongFunction<String> msgHeaderRawJsonStrFunc;
private final LongFunction<String> msgPriorityStrFunc;
private final LongFunction<String> msgPropRawJsonStrFunc;
private final LongFunction<String> msgBodyRawJsonStrFunc;
private final LongFunction<String> msgTypeFunc;
@ -56,6 +58,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
super(adapter, op, tgtNameFunc, s4jSpace);
this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
this.msgPriorityStrFunc = lookupOptionalStrOpValueFunc(MSG_PRIORITY_OP_PARAM);
this.msgPropRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM);
this.msgBodyRawJsonStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
this.msgTypeFunc = lookupOptionalStrOpValueFunc(MSG_TYPE_OP_PARAM);
@ -272,6 +275,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
public MessageProducerOp getOp(long cycle) {
String destName = destNameStrFunc.apply(cycle);
String jmsMsgHeaderRawJsonStr = msgHeaderRawJsonStrFunc.apply(cycle);
String jmsMsgPriorityStr = msgPriorityStrFunc.apply(cycle);
String jmsMsgPropertyRawJsonStr = msgPropRawJsonStrFunc.apply(cycle);
String jmsMsgBodyRawJsonStr = msgBodyRawJsonStrFunc.apply(cycle);
@ -294,6 +298,9 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
JMSProducer producer;
try {
producer = getJmsProducer(s4JJMSContextWrapper, asyncAPI);
int priority = NumberUtils.toInt(jmsMsgPriorityStr);
assert (priority >= 0 && priority <= 9);
producer.setPriority(priority);
}
catch (JMSException jmsException) {
throw new S4JAdapterUnexpectedException("Unable to create the JMS producer!");

View File

@ -225,7 +225,7 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
String destType,
String destName) throws JMSRuntimeException
{
String jmsContextIdStr = s4JJMSContextWrapper.getJmsContextIdentifer();
String jmsContextIdStr = s4JJMSContextWrapper.getJmsContextIdentifier();
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
S4JSpace.JMSDestinationCacheKey destinationCacheKey =

View File

@ -205,6 +205,30 @@ public class S4JAdapterUtil {
return StringUtils.join(JMS_MESSAGE_TYPES.LABELS, ", ");
}
// Message compression types
public enum MSG_COMPRESSION_TYPE_STR {
LZ4("LZ4"),
ZSTD("ZSTD"),
ZLIB("ZLIB"),
SNAPPY("SNAPPY");
public final String label;
MSG_COMPRESSION_TYPE_STR(String label) {
this.label = label;
}
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
.collect(Collectors.toUnmodifiableSet());
private static boolean isValidLabel(String label) {
return LABELS.contains(StringUtils.upperCase(label));
}
}
public static String getValidMsgCompressionTypeList() {
return StringUtils.join(MSG_COMPRESSION_TYPE_STR.LABELS, ", ");
}
public static boolean isValidMsgCompressionTypeStr(String type) {
return MSG_COMPRESSION_TYPE_STR.isValidLabel(type);
}
///////
// Convert JSON string to a key/value map
public static Map<String, String> convertJsonToMap(String jsonStr) throws Exception {

View File

@ -71,34 +71,26 @@ public class S4JClientConfConverter {
/**
* Non-primitive type processing for Pulsar producer configuration items
*/
// "compressionType" has value type "CompressionType"
// - expecting the following values: 'LZ4', 'ZLIB', 'ZSTD', 'SNAPPY'
String confKeyName = "compressionType";
String confVal = pulsarProducerConfMapRaw.get(confKeyName);
String expectedVal = "(LZ4|ZLIB|ZSTD|SNAPPY)";
if (StringUtils.isNotBlank(confVal)) {
if (StringUtils.equalsAnyIgnoreCase(confVal, "LZ4", "ZLIB", "ZSTD", "SNAPPY")) {
CompressionType compressionType = CompressionType.NONE;
switch (StringUtils.upperCase(confVal)) {
case "LZ4":
compressionType = CompressionType.LZ4;
case "ZLIB":
compressionType = CompressionType.ZLIB;
case "ZSTD":
compressionType = CompressionType.ZSTD;
case "SNAPPY":
compressionType = CompressionType.SNAPPY;
}
s4jProducerConfObjMap.put(confKeyName, compressionType);
} else {
throw new S4JAdapterInvalidParamException(
getInvalidConfValStr(confKeyName, confVal, "producer", expectedVal));
CompressionType compressionType = CompressionType.NONE;
if ( StringUtils.isNotBlank(confVal) ) {
try {
S4JAdapterUtil.MSG_COMPRESSION_TYPE_STR compressionTypeStr =
S4JAdapterUtil.MSG_COMPRESSION_TYPE_STR.valueOf(confVal);
compressionType = switch (compressionTypeStr) {
case LZ4 -> CompressionType.LZ4;
case ZLIB -> CompressionType.ZLIB;
case ZSTD -> CompressionType.ZSTD;
case SNAPPY -> CompressionType.SNAPPY;
};
} catch (IllegalArgumentException e) {
// Any invalid value will be treated as no compression
}
}
s4jProducerConfObjMap.put(confKeyName, compressionType);
// TODO: Skip the following Pulsar configuration items for now because they're not really
// needed in the NB S4J testing at the moment. Add support for them when needed.
// * messageRoutingMode
@ -312,7 +304,9 @@ public class S4JClientConfConverter {
Map.entry("jms.usePulsarAdmin","boolean"),
Map.entry("jms.useServerSideFiltering","boolean"),
Map.entry("jms.waitForServerStartupTimeout","int"),
Map.entry("jms.transactionsStickyPartitions", "boolean")
Map.entry("jms.transactionsStickyPartitions", "boolean"),
Map.entry("jms.enableJMSPriority","boolean"),
Map.entry("jms.priorityMapping","String")
);
public static Map<String, Object> convertRawJmsConf(Map<String, String> s4jJmsConfMapRaw) {
Map<String, Object> s4jJmsConfObjMap = new HashMap<>();

View File

@ -34,7 +34,7 @@ public class S4JJMSContextWrapper {
public int getJmsSessionMode() { return jmsSessionMode; }
public boolean isTransactedMode() { return Session.SESSION_TRANSACTED == this.getJmsSessionMode(); }
public String getJmsContextIdentifer() { return jmsContextIdentifer; }
public String getJmsContextIdentifier() { return jmsContextIdentifer; }
public JMSContext getJmsContext() { return jmsContext; }
public void close() {
@ -45,7 +45,7 @@ public class S4JJMSContextWrapper {
public String toString() {
return new ToStringBuilder(this).
append("jmsContextIdentifer", jmsContextIdentifer).
append("jmsContextIdentifier", jmsContextIdentifer).
append("jmsContext", jmsContext.toString()).
toString();
}

View File

@ -5,6 +5,7 @@ bindings:
mymap_val1: AlphaNumericString(10)
mymap_val2: AlphaNumericString(20)
mystream_val1: AlphaNumericString(50)
my_priority: WeightedLongs('2:20;4:70;8:10')
# document level parameters that apply to all Pulsar client types:
params:
@ -25,6 +26,12 @@ blocks:
"JMSPriority": "9"
}
## (Optional) S4J Message priority emulation (since Pulsar doesn't have native message priority)
# - jms.enableJMSPriority must be set to true in S4J configuration;
# otherwise, the priority value will be ignored.
# - If this is set, the "JMSPriority" value in the header will be ignored.
msg_priority: "{my_priority}"
## (Optional) JMS properties, predefined or customized (in JSON format).
msg_property: |
{