Convert message error runtime exception to counter metrics

This commit is contained in:
Yabin Meng 2021-10-13 10:10:48 -05:00
parent 7c2e26d80d
commit 686af3d9ca
8 changed files with 231 additions and 140 deletions

View File

@ -38,6 +38,12 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
// Metrics for NB Pulsar driver milestone: https://github.com/nosqlbench/nosqlbench/milestone/11
// - end-to-end latency
private Histogram e2eMsgProcLatencyHistogram;
// - message out of sequence error counter
private Counter msgErrOutOfSeqCounter;
// - message loss counter
private Counter msgErrLossCounter;
// - message duplicate (when dedup is enabled) error counter
private Counter msgErrDuplicateCounter;
private PulsarSpaceCache pulsarCache;
@ -76,6 +82,9 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
commitTransactionTimer = ActivityMetrics.timer(activityDef, "commit_transaction");
e2eMsgProcLatencyHistogram = ActivityMetrics.histogram(activityDef, "e2e_msg_latency");
msgErrOutOfSeqCounter = ActivityMetrics.counter(activityDef, "err_msg_oos");
msgErrLossCounter = ActivityMetrics.counter(activityDef, "err_msg_loss");
msgErrDuplicateCounter = ActivityMetrics.counter(activityDef, "err_msg_dup");
String pulsarClntConfFile =
activityDef.getParams().getOptionalString("config").orElse("config.properties");
@ -231,4 +240,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public Timer getCommitTransactionTimer() { return commitTransactionTimer; }
public Histogram getE2eMsgProcLatencyHistogram() { return e2eMsgProcLatencyHistogram; }
public Counter getMsgErrOutOfSeqCounter() { return msgErrOutOfSeqCounter; }
public Counter getMsgErrLossCounter() { return msgErrLossCounter; }
public Counter getMsgErrDuplicateCounter() { return msgErrDuplicateCounter; }
}

View File

@ -1,15 +1,11 @@
package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import java.util.function.LongFunction;
@ -34,6 +30,14 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
private final LongFunction<String> subscriptionTypeFunc;
private final boolean e2eMsProc;
// Used for message loss checking
private long prevMsgSeqId = -1;
// Used for early quiting when there are message loss
// Otherwise, sync API may unblock unnecessarily
private long totalMsgLossCnt = 0;
private long maxMsgSeqToExpect = -1;
public PulsarConsumerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
@ -52,17 +56,33 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
this.e2eMsProc = e2eMsgProc;
}
public long getPrevMsgSeqId() { return prevMsgSeqId; }
public void setPrevMsgSeqId(long prevMsgSeqId) { this.prevMsgSeqId = prevMsgSeqId; }
public long getTotalMsgLossCnt() { return totalMsgLossCnt; }
public void setTotalMsgLossCnt(long totalMsgLossCnt) { this.totalMsgLossCnt = totalMsgLossCnt; }
public long getMaxMsgSeqToExpect() { return maxMsgSeqToExpect; }
public void setMaxMsgSeqToExpect(long maxMsgSeqToExpect) { this.maxMsgSeqToExpect = maxMsgSeqToExpect; }
@Override
public PulsarOp apply(long value) {
boolean seqTracking = seqTrackingFunc.apply(value);
if ( seqTracking && (maxMsgSeqToExpect != -1) ) {
if ( (value + totalMsgLossCnt) > maxMsgSeqToExpect) {
return new PulsarConumerEmptyOp(pulsarActivity);
}
}
Consumer<?> consumer = consumerFunc.apply(value);
boolean asyncApi = asyncApiFunc.apply(value);
boolean useTransaction = useTransactionFunc.apply(value);
boolean seqTracking = seqTrackingFunc.apply(value);
Supplier<Transaction> transactionSupplier = transactionSupplierFunc.apply(value);
boolean topicMsgDedup = topicMsgDedupFunc.apply(value);
String subscriptionType = subscriptionTypeFunc.apply(value);
return new PulsarConsumerOp(
this,
pulsarActivity,
asyncApi,
useTransaction,

View File

@ -22,6 +22,7 @@ public class PulsarConsumerOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class);
private final PulsarConsumerMapper consumerMapper;
private final PulsarActivity pulsarActivity;
private final boolean asyncPulsarOp;
@ -37,17 +38,25 @@ public class PulsarConsumerOp implements PulsarOp {
private final boolean e2eMsgProc;
private final long curCycleNum;
private long curMsgSeqId;
private long prevMsgSeqId;
private final Counter bytesCounter;
private final Histogram messageSizeHistogram;
private final Timer transactionCommitTimer;
// keep track of end-to-end message latency
private final Histogram e2eMsgProcLatencyHistogram;
// message out-of-sequence error counter
private final Counter msgErrOutOfSeqCounter;
// message out-of-sequence error counter
private final Counter msgErrDuplicateCounter;
// message loss error counter
private final Counter msgErrLossCounter;
// Used for message error tracking
private final boolean ignoreMsgLossCheck;
private final boolean ignoreMsgDupCheck;
public PulsarConsumerOp(
PulsarConsumerMapper consumerMapper,
PulsarActivity pulsarActivity,
boolean asyncPulsarOp,
boolean useTransaction,
@ -61,6 +70,7 @@ public class PulsarConsumerOp implements PulsarOp {
long curCycleNum,
boolean e2eMsgProc)
{
this.consumerMapper = consumerMapper;
this.pulsarActivity = pulsarActivity;
this.asyncPulsarOp = asyncPulsarOp;
@ -76,14 +86,80 @@ public class PulsarConsumerOp implements PulsarOp {
this.curCycleNum = curCycleNum;
this.e2eMsgProc = e2eMsgProc;
this.curMsgSeqId = 0;
this.prevMsgSeqId = (curCycleNum - 1);
this.bytesCounter = pulsarActivity.getBytesCounter();
this.messageSizeHistogram = pulsarActivity.getMessageSizeHistogram();
this.transactionCommitTimer = pulsarActivity.getCommitTransactionTimer();
this.e2eMsgProcLatencyHistogram = pulsarActivity.getE2eMsgProcLatencyHistogram();
this.msgErrOutOfSeqCounter = pulsarActivity.getMsgErrOutOfSeqCounter();
this.msgErrLossCounter = pulsarActivity.getMsgErrLossCounter();
this.msgErrDuplicateCounter = pulsarActivity.getMsgErrDuplicateCounter();
// When message deduplication configuration is not enable, ignore message
// duplication check
this.ignoreMsgDupCheck = !this.topicMsgDedup;
// Limitations of the message sequence based check:
// - For message out of sequence and message duplicate check, it works for
// all subscription types, including "Shared" and "Key_Shared"
// - For message loss, it doesn't work for "Shared" and "Key_Shared"
// subscription types
this.ignoreMsgLossCheck =
StringUtils.equalsAnyIgnoreCase(this.subscriptionType,
PulsarActivityUtil.SUBSCRIPTION_TYPE.Shared.label,
PulsarActivityUtil.SUBSCRIPTION_TYPE.Key_Shared.label);
}
private void checkAndUpdateMessageErrorCounter(Message message) {
long maxMsgSeqToExpect = consumerMapper.getMaxMsgSeqToExpect();
if (maxMsgSeqToExpect == -1) {
String msgSeqTgtMaxStr = message.getProperty(PulsarActivityUtil.MSG_SEQUENCE_TGTMAX);
if (!StringUtils.isBlank(msgSeqTgtMaxStr)) {
consumerMapper.setMaxMsgSeqToExpect(Long.valueOf(msgSeqTgtMaxStr));
}
}
String msgSeqIdStr = message.getProperty(PulsarActivityUtil.MSG_SEQUENCE_ID);
if ( !StringUtils.isBlank(msgSeqIdStr) ) {
long prevMsgSeqId = consumerMapper.getPrevMsgSeqId();
long curMsgSeqId = Long.parseLong(msgSeqIdStr);
// Skip out-of-sequence check on the first received message
// - This is because out-of-sequence check requires at least 2
// received messages for comparison
if ( (prevMsgSeqId != -1) && (curMsgSeqId < prevMsgSeqId) ) {
msgErrOutOfSeqCounter.inc();
}
// Similarly, when message duplicate check is needed, we also
// skip the first received message.
if ( !ignoreMsgDupCheck && (prevMsgSeqId != -1) && (curMsgSeqId == prevMsgSeqId) ) {
msgErrDuplicateCounter.inc();
}
// Note that message loss could be happened anywhere, E.g.
// - published messages: 0,1,2,3,4,5
// - message loss scenario:
// * scenario 1: first set of messages are lost - received 2,3,4
// * scenario 2: messages in the middle are lost - received 0,1,3,4
// * scenario 3: last set of messages are lost - received 0,1,2
if ( !ignoreMsgLossCheck ) {
// This check covers message loss scenarios 1 and 2
if ( (curMsgSeqId - prevMsgSeqId) > 1 ){
// there could be multiple published messages lost between
// 2 received messages
long msgLostCnt = (curMsgSeqId - prevMsgSeqId) - 1;
msgErrLossCounter.inc(msgLostCnt);
consumerMapper.setTotalMsgLossCnt(consumerMapper.getTotalMsgLossCnt() + msgLostCnt);
}
// TODO: how can we detect message loss scenario 3?
}
prevMsgSeqId = curMsgSeqId;
consumerMapper.setPrevMsgSeqId(prevMsgSeqId);
}
}
@Override
@ -143,47 +219,17 @@ public class PulsarConsumerOp implements PulsarOp {
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
// keep track of message ordering and message loss
String msgSeqIdStr = message.getProperties().get(PulsarActivityUtil.MSG_SEQUENCE_ID);
if ( (seqTracking) && !StringUtils.isBlank(msgSeqIdStr) ) {
curMsgSeqId = Long.parseLong(msgSeqIdStr);
if ( prevMsgSeqId > -1) {
// normal case: message sequence id is monotonically increasing by 1
if ((curMsgSeqId - prevMsgSeqId) != 1) {
// abnormal case: out of ordering
// - for any subscription type, this check should always hold
if (curMsgSeqId < prevMsgSeqId) {
throw new PulsarMsgOutOfOrderException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
// - this sequence based message loss and message duplicate check can't be used for
// "Shared" subscription (ignore this check)
// - TODO: for Key_Shared subscription type, this logic needs to be improved on
// per-key basis
else {
if ( !StringUtils.equalsAnyIgnoreCase(subscriptionType,
PulsarActivityUtil.SUBSCRIPTION_TYPE.Shared.label,
PulsarActivityUtil.SUBSCRIPTION_TYPE.Key_Shared.label)) {
// abnormal case: message loss
if ((curMsgSeqId - prevMsgSeqId) > 1) {
throw new PulsarMsgLossException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
} else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) {
throw new PulsarMsgDuplicateException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
}
}
}
}
}
// keep track of message errors and update error counters
if (seqTracking) checkAndUpdateMessageErrorCounter(message);
int messageSize = message.getData().length;
bytesCounter.inc(messageSize);
messageSizeHistogram.update(messageSize);
if (useTransaction) {
if (!useTransaction) {
consumer.acknowledge(message.getMessageId());
}
else {
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
// little problem: here we are counting the "commit" time
@ -194,14 +240,12 @@ public class PulsarConsumerOp implements PulsarOp {
transaction.commit().get();
}
}
else {
consumer.acknowledge(message.getMessageId());
}
}
catch (Exception e) {
logger.error(
"Sync message receiving failed - timeout value: {} seconds ", timeoutSeconds);
e.printStackTrace();
throw new PulsarDriverUnexpectedException("" +
"Sync message receiving failed - timeout value: " + timeoutSeconds + " seconds ");
}
@ -254,47 +298,14 @@ public class PulsarConsumerOp implements PulsarOp {
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
// keep track of message ordering, message loss, and message duplication
String msgSeqIdStr = message.getProperties().get(PulsarActivityUtil.MSG_SEQUENCE_ID);
if ( (seqTracking) && !StringUtils.isBlank(msgSeqIdStr) ) {
curMsgSeqId = Long.parseLong(msgSeqIdStr);
// keep track of message errors and update error counters
if (seqTracking) checkAndUpdateMessageErrorCounter(message);
if (prevMsgSeqId > -1) {
// normal case: message sequence id is monotonically increasing by 1
if ((curMsgSeqId - prevMsgSeqId) != 1) {
// abnormal case: out of ordering
// - for any subscription type, this check should always hold
if (curMsgSeqId < prevMsgSeqId) {
throw new PulsarMsgOutOfOrderException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
// - this sequence based message loss and message duplicate check can't be used for
// "Shared" subscription (ignore this check)
// - TODO: for Key_Shared subscription type, this logic needs to be improved on
// per-key basis
else {
if ( !StringUtils.equalsAnyIgnoreCase(subscriptionType,
PulsarActivityUtil.SUBSCRIPTION_TYPE.Shared.label,
PulsarActivityUtil.SUBSCRIPTION_TYPE.Key_Shared.label)) {
// abnormal case: message loss
if ((curMsgSeqId - prevMsgSeqId) > 1) {
throw new PulsarMsgLossException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
} else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) {
throw new PulsarMsgDuplicateException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
}
}
}
}
}
if (useTransaction) {
consumer.acknowledgeAsync(message.getMessageId(), transaction);
if (!useTransaction) {
consumer.acknowledgeAsync(message);
}
else {
consumer.acknowledgeAsync(message);
consumer.acknowledgeAsync(message.getMessageId(), transaction);
}
timeTracker.run();

View File

@ -0,0 +1,21 @@
package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import io.nosqlbench.driver.pulsar.PulsarActivity;
public class PulsarConumerEmptyOp implements PulsarOp {
private final PulsarActivity pulsarActivity;
// message loss error counter
private final Counter msgErrLossCounter;
public PulsarConumerEmptyOp(PulsarActivity pulsarActivity) {
this.pulsarActivity = pulsarActivity;
this.msgErrLossCounter = pulsarActivity.getMsgErrDuplicateCounter();
}
@Override
public void run(Runnable timeTracker) {
}
}

View File

@ -0,0 +1,20 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class PulsarProducerEmptyOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarProducerEmptyOp.class);
private final PulsarActivity pulsarActivity;
public PulsarProducerEmptyOp(PulsarActivity pulsarActivity) {
this.pulsarActivity = pulsarActivity;
}
@Override
public void run(Runnable timeTracker) {
}
}

View File

@ -36,6 +36,8 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper {
private final LongFunction<String> propFunc;
private final LongFunction<String> payloadFunc;
private final long totalCycleCount;
public PulsarProducerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
@ -55,6 +57,8 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper {
this.keyFunc = keyFunc;
this.propFunc = propFunc;
this.payloadFunc = payloadFunc;
this.totalCycleCount = pulsarActivity.getActivityDef().getCycleCount();
}
@Override
@ -66,19 +70,23 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper {
Producer<?> producer = producerFunc.apply(value);
// Simulate error 10% of the time
boolean lastMsg = (value == (totalCycleCount-1));
// Simulate error 10% of the time, but always ignore
// the last message
float rndVal = RandomUtils.nextFloat(0, 1.0f);
boolean simulationError = (rndVal >= 0) && (rndVal < 0.1f);
String seqErrSimuType = seqErrSimuTypeFunc.apply(value);
boolean simulationError = (!lastMsg) && ((rndVal >= 0) && (rndVal < 0.2f));
String seqErrSimuTypesStr = seqErrSimuTypeFunc.apply(value);
boolean simulateMsgOutofOrder = simulationError &&
!StringUtils.isBlank(seqErrSimuType) &&
StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.OutOfOrder.label);
!StringUtils.isBlank(seqErrSimuTypesStr) &&
StringUtils.containsIgnoreCase(seqErrSimuTypesStr, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.OutOfOrder.label);
boolean simulateMsgLoss = simulationError &&
!StringUtils.isBlank(seqErrSimuType) &&
StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgLoss.label);
!StringUtils.isBlank(seqErrSimuTypesStr) &&
StringUtils.containsIgnoreCase(seqErrSimuTypesStr, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgLoss.label);
boolean simulateMsgDup = simulationError &&
!StringUtils.isBlank(seqErrSimuType) &&
StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgDup.label);
!StringUtils.isBlank(seqErrSimuTypesStr) &&
StringUtils.containsIgnoreCase(seqErrSimuTypesStr, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgDup.label);
String msgKey = keyFunc.apply(value);
String msgPayload = payloadFunc.apply(value);
@ -99,36 +107,47 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper {
}
}
// Set message sequence tracking property
if (seqTracking) {
// normal case
if (!simulateMsgOutofOrder && !simulateMsgDup) {
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value));
}
// simulate message out of order
else if ( simulateMsgOutofOrder ) {
int rndmOffset = 2;
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID,
String.valueOf((value > rndmOffset) ? (value-rndmOffset) : value));
}
// simulate message duplication
else {
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value-1));
}
// message loss simulation is not done by message property
// we simply skip sending message in the current NB cycle
}
// Error simulation sequence:
// - message loss > message out of order > message duplication
if (!simulateMsgLoss) {
// Set message sequence tracking property
if (seqTracking) {
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_TGTMAX,
String.valueOf(pulsarActivity.getActivityDef().getCycleCount()-1));
return new PulsarProducerOp(
pulsarActivity,
asyncApi,
useTransaction,
transactionSupplier,
producer,
clientSpace.getPulsarSchema(),
msgKey,
msgProperties,
msgPayload,
simulateMsgLoss);
// normal case
if (!simulateMsgOutofOrder && !simulateMsgDup) {
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value));
}
else {
// simulate message out of order
if (simulateMsgOutofOrder) {
int rndmOffset = 2;
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID,
String.valueOf((value > rndmOffset) ? (value - rndmOffset) : value));
}
// simulate message duplication
else if (simulateMsgDup) {
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value - 1));
}
}
}
return new PulsarProducerOp(
pulsarActivity,
asyncApi,
useTransaction,
transactionSupplier,
producer,
clientSpace.getPulsarSchema(),
msgKey,
msgProperties,
msgPayload);
}
else {
// Simulate message loss, but don't simulate the scenario where
// only the last set of message are lost
return new PulsarProducerEmptyOp(pulsarActivity);
}
}
}

View File

@ -38,7 +38,6 @@ public class PulsarProducerOp implements PulsarOp {
private final String msgKey;
private final Map<String, String> msgProperties;
private final String msgPayload;
private final boolean simulateMsgLoss;
private final Counter bytesCounter;
private final Histogram messageSizeHistogram;
@ -52,8 +51,7 @@ public class PulsarProducerOp implements PulsarOp {
Schema<?> schema,
String key,
Map<String, String> msgProperties,
String payload,
boolean simulateMsgLoss) {
String payload) {
this.pulsarActivity = pulsarActivity;
this.asyncPulsarOp = asyncPulsarOp;
@ -65,7 +63,6 @@ public class PulsarProducerOp implements PulsarOp {
this.msgKey = key;
this.msgProperties = msgProperties;
this.msgPayload = payload;
this.simulateMsgLoss = simulateMsgLoss;
this.bytesCounter = pulsarActivity.getBytesCounter();
this.messageSizeHistogram = pulsarActivity.getMessageSizeHistogram();
@ -74,11 +71,6 @@ public class PulsarProducerOp implements PulsarOp {
@Override
public void run(Runnable timeTracker) {
// Skip this cycle (without sending messages) if we're doing message loss simulation
if (simulateMsgLoss) {
return;
}
if ( StringUtils.isBlank(msgPayload)) {
throw new PulsarDriverParamException("Message payload (\"msg-value\") can't be empty!");
}

View File

@ -11,10 +11,6 @@ import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Reader;