Merge pull request #1946 from yabinmeng/main

Add support for reading a very large payload file (as message body) directly using S4J adapter
This commit is contained in:
Jonathan Shook 2024-05-17 12:06:27 -05:00 committed by GitHub
commit 1d9d50a200
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 67 additions and 16 deletions

View File

@ -65,8 +65,8 @@
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.4</version>
<artifactId>commons-beanutils-core</artifactId>
<version>1.8.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-configuration2 -->

View File

@ -24,14 +24,18 @@ import io.nosqlbench.nb.api.config.standard.ConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.nb.api.config.standard.Param;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.jms.*;
import java.io.File;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@ -105,6 +109,9 @@ public class S4JSpace implements AutoCloseable {
private long totalCycleNum;
// Large message payload simulation
private final MutablePair<Boolean, String> largePayloadSimPair = MutablePair.of(false, null);
public S4JSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
@ -128,6 +135,29 @@ public class S4JSpace implements AutoCloseable {
this.s4JClientConf = new S4JClientConf(webSvcUrl, pulsarSvcUrl, s4jClientConfFileName);
logger.info("{}", s4JClientConf.toString());
boolean simulateLargePayloadEnabled =
BooleanUtils.toBoolean(cfg.getOptional("simulate_large_payload").orElse("false"));
String simulatedPayloadFile = cfg.getOptional("simulated_payload_file").orElse(null);
if (simulateLargePayloadEnabled &&
(simulatedPayloadFile == null || ! new File(simulatedPayloadFile).exists()) ) {
throw new S4JAdapterInvalidParamException(
"When 'simulate_large_payload' is enabled, 'simulated_payload_file' must be provided and the file must exist.");
}
// Read the large payload file content and store it in the largePayloadSimPair
if (simulateLargePayloadEnabled) {
this.largePayloadSimPair.setLeft(true);
try {
String payloadContent = FileUtils.readFileToString(new File(simulatedPayloadFile), "UTF-8");
this.largePayloadSimPair.setRight(payloadContent);
} catch (Exception ex) {
throw new S4JAdapterUnexpectedException(
"Unable to read the simulated large payload file: " + simulatedPayloadFile);
}
}
logger.info("Simulated large payload enabled: {}, payload file: {}",
simulateLargePayloadEnabled, simulatedPayloadFile);
this.setS4JActivityStartTimeMills(System.currentTimeMillis());
this.initializeSpace(s4JClientConf);
@ -158,6 +188,10 @@ public class S4JSpace implements AutoCloseable {
.setDescription("JMS session mode"))
.add(Param.defaultTo("strict_msg_error_handling", false)
.setDescription("Whether to do strict error handling which is to stop NB S4J execution."))
.add(Param.defaultTo("simulate_large_payload", false)
.setDescription("Whether to simulate large message payload."))
.add(Param.optional("simulated_payload_file").
setDescription("File path to the simulated large message payload."))
.asReadOnly();
}
@ -222,6 +256,9 @@ public class S4JSpace implements AutoCloseable {
public long getTotalCycleNum() { return totalCycleNum; }
public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; }
public Pair<Boolean, String> getLargePayloadSimPair() { return largePayloadSimPair; }
public void initializeSpace(S4JClientConf s4JClientConnInfo) {
Map<String, Object> cfgMap;
try {

View File

@ -27,6 +27,7 @@ import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -48,7 +49,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
private final LongFunction<String> msgHeaderRawJsonStrFunc;
private final LongFunction<String> msgPriorityStrFunc;
private final LongFunction<String> msgPropRawJsonStrFunc;
private final LongFunction<String> msgBodyRawJsonStrFunc;
private final LongFunction<String> msgBodyRawStrFunc;
private final LongFunction<String> msgTypeFunc;
public MessageProducerOpDispenser(DriverAdapter adapter,
@ -60,13 +61,13 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
this.msgPriorityStrFunc = lookupOptionalStrOpValueFunc(MSG_PRIORITY_OP_PARAM);
this.msgPropRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM);
this.msgBodyRawJsonStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
this.msgBodyRawStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
this.msgTypeFunc = lookupOptionalStrOpValueFunc(MSG_TYPE_OP_PARAM);
}
private Message createAndSetMessagePayload(
S4JJMSContextWrapper s4JJMSContextWrapper,
String msgType, String msgBodyRawJsonStr) throws JMSException
String msgType, String msgPayload) throws JMSException
{
Message message;
int messageSize = 0;
@ -75,8 +76,8 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
if (StringUtils.equalsIgnoreCase(msgType, S4JAdapterUtil.JMS_MESSAGE_TYPES.TEXT.label)) {
message = jmsContext.createTextMessage();
((TextMessage) message).setText(msgBodyRawJsonStr);
messageSize = msgBodyRawJsonStr.length();
((TextMessage) message).setText(msgPayload);
messageSize = msgPayload.length();
} else if (StringUtils.equalsIgnoreCase(msgType, S4JAdapterUtil.JMS_MESSAGE_TYPES.MAP.label)) {
message = jmsContext.createMapMessage();
@ -84,7 +85,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
// Otherwise, it is an error
Map<String, String> jmsMsgBodyMap;
try {
jmsMsgBodyMap = S4JAdapterUtil.convertJsonToMap(msgBodyRawJsonStr);
jmsMsgBodyMap = S4JAdapterUtil.convertJsonToMap(msgPayload);
} catch (Exception e) {
throw new RuntimeException("The specified message payload can't be converted to a map when requiring a 'Map' message type!");
}
@ -102,7 +103,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
// Otherwise, it is an error
List<Object> jmsMsgBodyObjList;
try {
jmsMsgBodyObjList = S4JAdapterUtil.convertJsonToObjList(msgBodyRawJsonStr);
jmsMsgBodyObjList = S4JAdapterUtil.convertJsonToObjList(msgPayload);
} catch (Exception e) {
throw new RuntimeException("The specified message payload can't be converted to a list of Objects when requiring a 'Stream' message type!");
}
@ -113,13 +114,13 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
}
} else if (StringUtils.equalsIgnoreCase(msgType, S4JAdapterUtil.JMS_MESSAGE_TYPES.OBJECT.label)) {
message = jmsContext.createObjectMessage();
((ObjectMessage) message).setObject(msgBodyRawJsonStr);
messageSize += msgBodyRawJsonStr.getBytes().length;
((ObjectMessage) message).setObject(msgPayload);
messageSize += msgPayload.getBytes().length;
}
// default: BYTE message type
else {
message = jmsContext.createBytesMessage();
byte[] msgBytePayload = msgBodyRawJsonStr.getBytes();
byte[] msgBytePayload = msgPayload.getBytes();
((BytesMessage)message).writeBytes(msgBytePayload);
messageSize += msgBytePayload.length;
}
@ -277,9 +278,18 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
String jmsMsgHeaderRawJsonStr = msgHeaderRawJsonStrFunc.apply(cycle);
String jmsMsgPriorityStr = msgPriorityStrFunc.apply(cycle);
String jmsMsgPropertyRawJsonStr = msgPropRawJsonStrFunc.apply(cycle);
String jmsMsgBodyRawJsonStr = msgBodyRawJsonStrFunc.apply(cycle);
if (StringUtils.isBlank(jmsMsgBodyRawJsonStr)) {
// If 'simulate_large_payload' is enabled, replace the actual message payload with a static
// large payload that is read when the adapter is initialized
String effectiveMsgBody;
Pair<Boolean, String> largePayloadSimPair = s4jSpace.getLargePayloadSimPair();
if (largePayloadSimPair.getLeft()) {
effectiveMsgBody = largePayloadSimPair.getRight();
}
else {
effectiveMsgBody = msgBodyRawStrFunc.apply(cycle);
}
if (StringUtils.isBlank(effectiveMsgBody)) {
throw new S4JAdapterInvalidParamException("Message payload must be specified and can't be empty!");
}
@ -322,7 +332,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
//
Message message;
try {
message = createAndSetMessagePayload(s4JJMSContextWrapper, jmsMsgType, jmsMsgBodyRawJsonStr);
message = createAndSetMessagePayload(s4JJMSContextWrapper, jmsMsgType, effectiveMsgBody);
}
catch (JMSException jmsException) {
throw new RuntimeException("Failed to set create a JMS message and set its payload!");

View File

@ -79,7 +79,7 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
this.asyncAPI =
parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.ASYNC_API.label, Boolean.TRUE);
this.txnBatchNum =
parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.TXN_BATCH_NUM.label, Integer.valueOf(0));
parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.TXN_BATCH_NUM.label, 0);
this.totalThreadNum = NumberUtils.toInt(parsedOp.getStaticConfig("threads", String.class));
this.totalCycleNum = NumberUtils.toLong(parsedOp.getStaticConfig("cycles", String.class));

View File

@ -42,6 +42,10 @@ blocks:
msg_type: "text"
## (Mandatory) JMS message body. Value depends on msg_type.
# NOTE: using NB binding variable to generate the message body may be ignored
# if input CLI parameter 'simulate_large_payload' is set to true. In this case,
# 'simulated_payload_file' must be set to a valid file path. and all messages will
# have the same payload content as read from the file.
msg_body: "{mytext_val}"
# # example of having "map" as the message type