diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index 6d24f312a..e3aa48163 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -38,6 +38,12 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve // Metrics for NB Pulsar driver milestone: https://github.com/nosqlbench/nosqlbench/milestone/11 // - end-to-end latency private Histogram e2eMsgProcLatencyHistogram; + // - message out of sequence error counter + private Counter msgErrOutOfSeqCounter; + // - message loss counter + private Counter msgErrLossCounter; + // - message duplicate (when dedup is enabled) error counter + private Counter msgErrDuplicateCounter; private PulsarSpaceCache pulsarCache; @@ -76,6 +82,9 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve commitTransactionTimer = ActivityMetrics.timer(activityDef, "commit_transaction"); e2eMsgProcLatencyHistogram = ActivityMetrics.histogram(activityDef, "e2e_msg_latency"); + msgErrOutOfSeqCounter = ActivityMetrics.counter(activityDef, "err_msg_oos"); + msgErrLossCounter = ActivityMetrics.counter(activityDef, "err_msg_loss"); + msgErrDuplicateCounter = ActivityMetrics.counter(activityDef, "err_msg_dup"); String pulsarClntConfFile = activityDef.getParams().getOptionalString("config").orElse("config.properties"); @@ -231,4 +240,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve public Timer getCommitTransactionTimer() { return commitTransactionTimer; } public Histogram getE2eMsgProcLatencyHistogram() { return e2eMsgProcLatencyHistogram; } + public Counter getMsgErrOutOfSeqCounter() { return msgErrOutOfSeqCounter; } + public Counter getMsgErrLossCounter() { return msgErrLossCounter; } + public Counter getMsgErrDuplicateCounter() { return msgErrDuplicateCounter; } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java index cae3afd42..25af5d8ac 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java @@ -1,15 +1,11 @@ package io.nosqlbench.driver.pulsar.ops; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.transaction.Transaction; import java.util.function.LongFunction; @@ -34,6 +30,14 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { private final LongFunction subscriptionTypeFunc; private final boolean e2eMsProc; + // Used for message loss checking + private long prevMsgSeqId = -1; + + // Used for early quiting when there are message loss + // Otherwise, sync API may unblock unnecessarily + private long totalMsgLossCnt = 0; + private long maxMsgSeqToExpect = -1; + public PulsarConsumerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, PulsarActivity pulsarActivity, @@ -52,17 +56,33 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { this.e2eMsProc = e2eMsgProc; } + public long getPrevMsgSeqId() { return prevMsgSeqId; } + public void setPrevMsgSeqId(long prevMsgSeqId) { this.prevMsgSeqId = prevMsgSeqId; } + + public long getTotalMsgLossCnt() { return totalMsgLossCnt; } + public void setTotalMsgLossCnt(long totalMsgLossCnt) { this.totalMsgLossCnt = totalMsgLossCnt; } + + public long getMaxMsgSeqToExpect() { return maxMsgSeqToExpect; } + public void setMaxMsgSeqToExpect(long maxMsgSeqToExpect) { this.maxMsgSeqToExpect = maxMsgSeqToExpect; } + @Override public PulsarOp apply(long value) { + boolean seqTracking = seqTrackingFunc.apply(value); + if ( seqTracking && (maxMsgSeqToExpect != -1) ) { + if ( (value + totalMsgLossCnt) > maxMsgSeqToExpect) { + return new PulsarConumerEmptyOp(pulsarActivity); + } + } + Consumer consumer = consumerFunc.apply(value); boolean asyncApi = asyncApiFunc.apply(value); boolean useTransaction = useTransactionFunc.apply(value); - boolean seqTracking = seqTrackingFunc.apply(value); Supplier transactionSupplier = transactionSupplierFunc.apply(value); boolean topicMsgDedup = topicMsgDedupFunc.apply(value); String subscriptionType = subscriptionTypeFunc.apply(value); return new PulsarConsumerOp( + this, pulsarActivity, asyncApi, useTransaction, diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index 24f61acac..7dfc3dd95 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -22,6 +22,7 @@ public class PulsarConsumerOp implements PulsarOp { private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class); + private final PulsarConsumerMapper consumerMapper; private final PulsarActivity pulsarActivity; private final boolean asyncPulsarOp; @@ -37,17 +38,25 @@ public class PulsarConsumerOp implements PulsarOp { private final boolean e2eMsgProc; private final long curCycleNum; - private long curMsgSeqId; - private long prevMsgSeqId; - private final Counter bytesCounter; private final Histogram messageSizeHistogram; private final Timer transactionCommitTimer; // keep track of end-to-end message latency private final Histogram e2eMsgProcLatencyHistogram; + // message out-of-sequence error counter + private final Counter msgErrOutOfSeqCounter; + // message out-of-sequence error counter + private final Counter msgErrDuplicateCounter; + // message loss error counter + private final Counter msgErrLossCounter; + + // Used for message error tracking + private final boolean ignoreMsgLossCheck; + private final boolean ignoreMsgDupCheck; public PulsarConsumerOp( + PulsarConsumerMapper consumerMapper, PulsarActivity pulsarActivity, boolean asyncPulsarOp, boolean useTransaction, @@ -61,6 +70,7 @@ public class PulsarConsumerOp implements PulsarOp { long curCycleNum, boolean e2eMsgProc) { + this.consumerMapper = consumerMapper; this.pulsarActivity = pulsarActivity; this.asyncPulsarOp = asyncPulsarOp; @@ -76,14 +86,80 @@ public class PulsarConsumerOp implements PulsarOp { this.curCycleNum = curCycleNum; this.e2eMsgProc = e2eMsgProc; - this.curMsgSeqId = 0; - this.prevMsgSeqId = (curCycleNum - 1); - this.bytesCounter = pulsarActivity.getBytesCounter(); this.messageSizeHistogram = pulsarActivity.getMessageSizeHistogram(); this.transactionCommitTimer = pulsarActivity.getCommitTransactionTimer(); this.e2eMsgProcLatencyHistogram = pulsarActivity.getE2eMsgProcLatencyHistogram(); + this.msgErrOutOfSeqCounter = pulsarActivity.getMsgErrOutOfSeqCounter(); + this.msgErrLossCounter = pulsarActivity.getMsgErrLossCounter(); + this.msgErrDuplicateCounter = pulsarActivity.getMsgErrDuplicateCounter(); + + // When message deduplication configuration is not enable, ignore message + // duplication check + this.ignoreMsgDupCheck = !this.topicMsgDedup; + + // Limitations of the message sequence based check: + // - For message out of sequence and message duplicate check, it works for + // all subscription types, including "Shared" and "Key_Shared" + // - For message loss, it doesn't work for "Shared" and "Key_Shared" + // subscription types + this.ignoreMsgLossCheck = + StringUtils.equalsAnyIgnoreCase(this.subscriptionType, + PulsarActivityUtil.SUBSCRIPTION_TYPE.Shared.label, + PulsarActivityUtil.SUBSCRIPTION_TYPE.Key_Shared.label); + } + + private void checkAndUpdateMessageErrorCounter(Message message) { + long maxMsgSeqToExpect = consumerMapper.getMaxMsgSeqToExpect(); + if (maxMsgSeqToExpect == -1) { + String msgSeqTgtMaxStr = message.getProperty(PulsarActivityUtil.MSG_SEQUENCE_TGTMAX); + if (!StringUtils.isBlank(msgSeqTgtMaxStr)) { + consumerMapper.setMaxMsgSeqToExpect(Long.valueOf(msgSeqTgtMaxStr)); + } + } + + String msgSeqIdStr = message.getProperty(PulsarActivityUtil.MSG_SEQUENCE_ID); + + if ( !StringUtils.isBlank(msgSeqIdStr) ) { + long prevMsgSeqId = consumerMapper.getPrevMsgSeqId(); + long curMsgSeqId = Long.parseLong(msgSeqIdStr); + + // Skip out-of-sequence check on the first received message + // - This is because out-of-sequence check requires at least 2 + // received messages for comparison + if ( (prevMsgSeqId != -1) && (curMsgSeqId < prevMsgSeqId) ) { + msgErrOutOfSeqCounter.inc(); + } + + // Similarly, when message duplicate check is needed, we also + // skip the first received message. + if ( !ignoreMsgDupCheck && (prevMsgSeqId != -1) && (curMsgSeqId == prevMsgSeqId) ) { + msgErrDuplicateCounter.inc(); + } + + // Note that message loss could be happened anywhere, E.g. + // - published messages: 0,1,2,3,4,5 + // - message loss scenario: + // * scenario 1: first set of messages are lost - received 2,3,4 + // * scenario 2: messages in the middle are lost - received 0,1,3,4 + // * scenario 3: last set of messages are lost - received 0,1,2 + if ( !ignoreMsgLossCheck ) { + // This check covers message loss scenarios 1 and 2 + if ( (curMsgSeqId - prevMsgSeqId) > 1 ){ + // there could be multiple published messages lost between + // 2 received messages + long msgLostCnt = (curMsgSeqId - prevMsgSeqId) - 1; + msgErrLossCounter.inc(msgLostCnt); + consumerMapper.setTotalMsgLossCnt(consumerMapper.getTotalMsgLossCnt() + msgLostCnt); + } + + // TODO: how can we detect message loss scenario 3? + } + + prevMsgSeqId = curMsgSeqId; + consumerMapper.setPrevMsgSeqId(prevMsgSeqId); + } } @Override @@ -143,47 +219,17 @@ public class PulsarConsumerOp implements PulsarOp { e2eMsgProcLatencyHistogram.update(e2eMsgLatency); } - // keep track of message ordering and message loss - String msgSeqIdStr = message.getProperties().get(PulsarActivityUtil.MSG_SEQUENCE_ID); - if ( (seqTracking) && !StringUtils.isBlank(msgSeqIdStr) ) { - curMsgSeqId = Long.parseLong(msgSeqIdStr); - - if ( prevMsgSeqId > -1) { - // normal case: message sequence id is monotonically increasing by 1 - if ((curMsgSeqId - prevMsgSeqId) != 1) { - // abnormal case: out of ordering - // - for any subscription type, this check should always hold - if (curMsgSeqId < prevMsgSeqId) { - throw new PulsarMsgOutOfOrderException( - false, curCycleNum, curMsgSeqId, prevMsgSeqId); - } - // - this sequence based message loss and message duplicate check can't be used for - // "Shared" subscription (ignore this check) - // - TODO: for Key_Shared subscription type, this logic needs to be improved on - // per-key basis - else { - if ( !StringUtils.equalsAnyIgnoreCase(subscriptionType, - PulsarActivityUtil.SUBSCRIPTION_TYPE.Shared.label, - PulsarActivityUtil.SUBSCRIPTION_TYPE.Key_Shared.label)) { - // abnormal case: message loss - if ((curMsgSeqId - prevMsgSeqId) > 1) { - throw new PulsarMsgLossException( - false, curCycleNum, curMsgSeqId, prevMsgSeqId); - } else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) { - throw new PulsarMsgDuplicateException( - false, curCycleNum, curMsgSeqId, prevMsgSeqId); - } - } - } - } - } - } + // keep track of message errors and update error counters + if (seqTracking) checkAndUpdateMessageErrorCounter(message); int messageSize = message.getData().length; bytesCounter.inc(messageSize); messageSizeHistogram.update(messageSize); - if (useTransaction) { + if (!useTransaction) { + consumer.acknowledge(message.getMessageId()); + } + else { consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); // little problem: here we are counting the "commit" time @@ -194,14 +240,12 @@ public class PulsarConsumerOp implements PulsarOp { transaction.commit().get(); } } - else { - consumer.acknowledge(message.getMessageId()); - } } catch (Exception e) { logger.error( "Sync message receiving failed - timeout value: {} seconds ", timeoutSeconds); + e.printStackTrace(); throw new PulsarDriverUnexpectedException("" + "Sync message receiving failed - timeout value: " + timeoutSeconds + " seconds "); } @@ -254,47 +298,14 @@ public class PulsarConsumerOp implements PulsarOp { e2eMsgProcLatencyHistogram.update(e2eMsgLatency); } - // keep track of message ordering, message loss, and message duplication - String msgSeqIdStr = message.getProperties().get(PulsarActivityUtil.MSG_SEQUENCE_ID); - if ( (seqTracking) && !StringUtils.isBlank(msgSeqIdStr) ) { - curMsgSeqId = Long.parseLong(msgSeqIdStr); + // keep track of message errors and update error counters + if (seqTracking) checkAndUpdateMessageErrorCounter(message); - if (prevMsgSeqId > -1) { - // normal case: message sequence id is monotonically increasing by 1 - if ((curMsgSeqId - prevMsgSeqId) != 1) { - // abnormal case: out of ordering - // - for any subscription type, this check should always hold - if (curMsgSeqId < prevMsgSeqId) { - throw new PulsarMsgOutOfOrderException( - false, curCycleNum, curMsgSeqId, prevMsgSeqId); - } - // - this sequence based message loss and message duplicate check can't be used for - // "Shared" subscription (ignore this check) - // - TODO: for Key_Shared subscription type, this logic needs to be improved on - // per-key basis - else { - if ( !StringUtils.equalsAnyIgnoreCase(subscriptionType, - PulsarActivityUtil.SUBSCRIPTION_TYPE.Shared.label, - PulsarActivityUtil.SUBSCRIPTION_TYPE.Key_Shared.label)) { - // abnormal case: message loss - if ((curMsgSeqId - prevMsgSeqId) > 1) { - throw new PulsarMsgLossException( - false, curCycleNum, curMsgSeqId, prevMsgSeqId); - } else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) { - throw new PulsarMsgDuplicateException( - false, curCycleNum, curMsgSeqId, prevMsgSeqId); - } - } - } - } - } - } - - if (useTransaction) { - consumer.acknowledgeAsync(message.getMessageId(), transaction); + if (!useTransaction) { + consumer.acknowledgeAsync(message); } else { - consumer.acknowledgeAsync(message); + consumer.acknowledgeAsync(message.getMessageId(), transaction); } timeTracker.run(); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConumerEmptyOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConumerEmptyOp.java new file mode 100644 index 000000000..255a81351 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConumerEmptyOp.java @@ -0,0 +1,21 @@ +package io.nosqlbench.driver.pulsar.ops; + +import com.codahale.metrics.Counter; +import io.nosqlbench.driver.pulsar.PulsarActivity; + +public class PulsarConumerEmptyOp implements PulsarOp { + + private final PulsarActivity pulsarActivity; + + // message loss error counter + private final Counter msgErrLossCounter; + + public PulsarConumerEmptyOp(PulsarActivity pulsarActivity) { + this.pulsarActivity = pulsarActivity; + this.msgErrLossCounter = pulsarActivity.getMsgErrDuplicateCounter(); + } + + @Override + public void run(Runnable timeTracker) { + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerEmptyOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerEmptyOp.java new file mode 100644 index 000000000..f94af3982 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerEmptyOp.java @@ -0,0 +1,20 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.PulsarActivity; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class PulsarProducerEmptyOp implements PulsarOp { + + private final static Logger logger = LogManager.getLogger(PulsarProducerEmptyOp.class); + + private final PulsarActivity pulsarActivity; + + public PulsarProducerEmptyOp(PulsarActivity pulsarActivity) { + this.pulsarActivity = pulsarActivity; + } + + @Override + public void run(Runnable timeTracker) { + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java index 9bdf63881..594f40406 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java @@ -36,6 +36,8 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper { private final LongFunction propFunc; private final LongFunction payloadFunc; + private final long totalCycleCount; + public PulsarProducerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, PulsarActivity pulsarActivity, @@ -55,6 +57,8 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper { this.keyFunc = keyFunc; this.propFunc = propFunc; this.payloadFunc = payloadFunc; + + this.totalCycleCount = pulsarActivity.getActivityDef().getCycleCount(); } @Override @@ -66,19 +70,23 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper { Producer producer = producerFunc.apply(value); - // Simulate error 10% of the time + boolean lastMsg = (value == (totalCycleCount-1)); + + // Simulate error 10% of the time, but always ignore + // the last message float rndVal = RandomUtils.nextFloat(0, 1.0f); - boolean simulationError = (rndVal >= 0) && (rndVal < 0.1f); - String seqErrSimuType = seqErrSimuTypeFunc.apply(value); + boolean simulationError = (!lastMsg) && ((rndVal >= 0) && (rndVal < 0.2f)); + + String seqErrSimuTypesStr = seqErrSimuTypeFunc.apply(value); boolean simulateMsgOutofOrder = simulationError && - !StringUtils.isBlank(seqErrSimuType) && - StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.OutOfOrder.label); + !StringUtils.isBlank(seqErrSimuTypesStr) && + StringUtils.containsIgnoreCase(seqErrSimuTypesStr, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.OutOfOrder.label); boolean simulateMsgLoss = simulationError && - !StringUtils.isBlank(seqErrSimuType) && - StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgLoss.label); + !StringUtils.isBlank(seqErrSimuTypesStr) && + StringUtils.containsIgnoreCase(seqErrSimuTypesStr, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgLoss.label); boolean simulateMsgDup = simulationError && - !StringUtils.isBlank(seqErrSimuType) && - StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgDup.label); + !StringUtils.isBlank(seqErrSimuTypesStr) && + StringUtils.containsIgnoreCase(seqErrSimuTypesStr, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgDup.label); String msgKey = keyFunc.apply(value); String msgPayload = payloadFunc.apply(value); @@ -99,36 +107,47 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper { } } - // Set message sequence tracking property - if (seqTracking) { - // normal case - if (!simulateMsgOutofOrder && !simulateMsgDup) { - msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value)); - } - // simulate message out of order - else if ( simulateMsgOutofOrder ) { - int rndmOffset = 2; - msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, - String.valueOf((value > rndmOffset) ? (value-rndmOffset) : value)); - } - // simulate message duplication - else { - msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value-1)); - } - // message loss simulation is not done by message property - // we simply skip sending message in the current NB cycle - } + // Error simulation sequence: + // - message loss > message out of order > message duplication + if (!simulateMsgLoss) { + // Set message sequence tracking property + if (seqTracking) { + msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_TGTMAX, + String.valueOf(pulsarActivity.getActivityDef().getCycleCount()-1)); - return new PulsarProducerOp( - pulsarActivity, - asyncApi, - useTransaction, - transactionSupplier, - producer, - clientSpace.getPulsarSchema(), - msgKey, - msgProperties, - msgPayload, - simulateMsgLoss); + // normal case + if (!simulateMsgOutofOrder && !simulateMsgDup) { + msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value)); + } + else { + // simulate message out of order + if (simulateMsgOutofOrder) { + int rndmOffset = 2; + msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, + String.valueOf((value > rndmOffset) ? (value - rndmOffset) : value)); + } + // simulate message duplication + else if (simulateMsgDup) { + msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value - 1)); + } + } + } + + return new PulsarProducerOp( + pulsarActivity, + asyncApi, + useTransaction, + transactionSupplier, + producer, + clientSpace.getPulsarSchema(), + msgKey, + msgProperties, + msgPayload); + } + else { + // Simulate message loss, but don't simulate the scenario where + // only the last set of message are lost + return new PulsarProducerEmptyOp(pulsarActivity); + } } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java index 51c1336b2..796621df9 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java @@ -38,7 +38,6 @@ public class PulsarProducerOp implements PulsarOp { private final String msgKey; private final Map msgProperties; private final String msgPayload; - private final boolean simulateMsgLoss; private final Counter bytesCounter; private final Histogram messageSizeHistogram; @@ -52,8 +51,7 @@ public class PulsarProducerOp implements PulsarOp { Schema schema, String key, Map msgProperties, - String payload, - boolean simulateMsgLoss) { + String payload) { this.pulsarActivity = pulsarActivity; this.asyncPulsarOp = asyncPulsarOp; @@ -65,7 +63,6 @@ public class PulsarProducerOp implements PulsarOp { this.msgKey = key; this.msgProperties = msgProperties; this.msgPayload = payload; - this.simulateMsgLoss = simulateMsgLoss; this.bytesCounter = pulsarActivity.getBytesCounter(); this.messageSizeHistogram = pulsarActivity.getMessageSizeHistogram(); @@ -74,11 +71,6 @@ public class PulsarProducerOp implements PulsarOp { @Override public void run(Runnable timeTracker) { - // Skip this cycle (without sending messages) if we're doing message loss simulation - if (simulateMsgLoss) { - return; - } - if ( StringUtils.isBlank(msgPayload)) { throw new PulsarDriverParamException("Message payload (\"msg-value\") can't be empty!"); } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 8a49170a4..892d35f50 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -11,10 +11,6 @@ import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.pulsar.client.admin.Namespaces; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Reader;