Add customized Pulsar driver exceptions

This commit is contained in:
Yabin Meng 2021-09-29 11:27:24 -05:00
parent f72bdb6b95
commit 949790b797
9 changed files with 99 additions and 46 deletions

View File

@ -0,0 +1,8 @@
package io.nosqlbench.driver.pulsar.exception;
public class PulsarDriverParamException extends RuntimeException {
public PulsarDriverParamException(String message) {
super(message);
}
}

View File

@ -0,0 +1,9 @@
package io.nosqlbench.driver.pulsar.exception;
public class PulsarDriverUnexpectedException extends RuntimeException {
public PulsarDriverUnexpectedException(String message) {
super(message);
}
public PulsarDriverUnexpectedException(Exception e) { super(e); }
}

View File

@ -0,0 +1,7 @@
package io.nosqlbench.driver.pulsar.exception;
public class PulsarDriverUnsupportedOpException extends RuntimeException {
public PulsarDriverUnsupportedOpException() { super("Unsupported Pulsar driver operation type"); }
}

View File

@ -0,0 +1,10 @@
package io.nosqlbench.driver.pulsar.exception;
public class PulsarMsgDuplicateException extends RuntimeException {
public PulsarMsgDuplicateException(boolean asyncPulsarOp, long nbCycleNum, long curMsgSeqId, long prevMsgSeqId) {
super("" + (asyncPulsarOp ? "[AsyncAPI]" : "[SyncAPI]") +
" Detected duplicate message when message deduplication is enabled (curCycleNum=" + nbCycleNum +
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + ").");
}
}

View File

@ -0,0 +1,11 @@
package io.nosqlbench.driver.pulsar.exception;
public class PulsarMsgLossException extends RuntimeException {
public PulsarMsgLossException(boolean asyncPulsarOp, long nbCycleNum, long curMsgSeqId, long prevMsgSeqId) {
super("" + (asyncPulsarOp ? "[AsyncAPI]" : "[SyncAPI]") +
" Detected message sequence id gap (curCycleNum=" + nbCycleNum +
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " +
"Some published messages are not received!");
}
}

View File

@ -0,0 +1,11 @@
package io.nosqlbench.driver.pulsar.exception;
public class PulsarMsgOutOfOrderException extends RuntimeException {
public PulsarMsgOutOfOrderException(boolean asyncPulsarOp, long nbCycleNum, long curMsgSeqId, long prevMsgSeqId) {
super("" + (asyncPulsarOp ? "[AsyncAPI]" : "[SyncAPI]" ) +
" Detected message ordering is not guaranteed (curCycleNum=" + nbCycleNum +
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " +
"Older messages are received earlier!");
}
}

View File

@ -4,6 +4,7 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.exception.*;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils;
@ -149,22 +150,16 @@ public class PulsarConsumerOp implements PulsarOp {
if ((curMsgSeqId - prevMsgSeqId) != 1) {
// abnormal case: out of ordering
if (curMsgSeqId < prevMsgSeqId) {
throw new RuntimeException("" +
"[SyncAPI] Detected message ordering is not guaranteed (curCycleNum=" + curCycleNum +
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " +
"Older messages are received earlier!");
throw new PulsarMsgOutOfOrderException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
// abnormal case: message loss
else if ((curMsgSeqId - prevMsgSeqId) > 1) {
throw new RuntimeException("" +
"[SyncAPI] Detected message sequence id gap (curCycleNum=" + curCycleNum +
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " +
"Some published messages are not received!");
throw new PulsarMsgLossException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
} else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) {
throw new RuntimeException("" +
"[SyncAPI] Detected duplicate message when message deduplication is enabled " +
"(curCycleNum=" + curCycleNum + ", curMsgSeqId=" + curMsgSeqId +
", prevMsgSeqId=" + prevMsgSeqId + ")!");
throw new PulsarMsgDuplicateException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
}
}
@ -193,7 +188,8 @@ public class PulsarConsumerOp implements PulsarOp {
catch (Exception e) {
logger.error(
"Sync message receiving failed - timeout value: {} seconds ", timeoutSeconds);
throw new RuntimeException(e);
throw new PulsarDriverUnexpectedException("" +
"Sync message receiving failed - timeout value: " + timeoutSeconds + " seconds ");
}
}
else {
@ -254,22 +250,16 @@ public class PulsarConsumerOp implements PulsarOp {
if ((curMsgSeqId - prevMsgSeqId) != 1) {
// abnormal case: out of ordering
if (curMsgSeqId < prevMsgSeqId) {
throw new RuntimeException("" +
"[AsyncAPI] Detected message ordering is not guaranteed (curCycleNum=" + curCycleNum +
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " +
"Older messages are received earlier!");
throw new PulsarMsgOutOfOrderException(
true, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
// abnormal case: message loss
else if ((curMsgSeqId - prevMsgSeqId) > 1) {
throw new RuntimeException("" +
"[AsyncAPI] Detected message sequence id gap (curCycleNum=" + curCycleNum +
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " +
"Some published messages are not received!");
throw new PulsarMsgLossException(
true, curCycleNum, curMsgSeqId, prevMsgSeqId);
} else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) {
throw new RuntimeException("" +
"[AsyncAPI] Detected duplicate message when message deduplication is enabled " +
"(curCycleNum=" + curCycleNum + ", curMsgSeqId=" + curMsgSeqId +
", prevMsgSeqId=" + prevMsgSeqId + ")!");
throw new PulsarMsgDuplicateException(
true, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
}
}
@ -289,7 +279,7 @@ public class PulsarConsumerOp implements PulsarOp {
});
}
catch (Exception e) {
throw new RuntimeException(e);
throw new PulsarDriverUnexpectedException("Async message receiving failed");
}
}
}

View File

@ -4,6 +4,8 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.exception.PulsarDriverParamException;
import io.nosqlbench.driver.pulsar.exception.PulsarDriverUnexpectedException;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils;
@ -77,8 +79,8 @@ public class PulsarProducerOp implements PulsarOp {
return;
}
if ((msgPayload == null) || msgPayload.isEmpty()) {
throw new RuntimeException("Message payload (\"msg-value\") can't be empty!");
if ( StringUtils.isBlank(msgPayload)) {
throw new PulsarDriverParamException("Message payload (\"msg-value\") can't be empty!");
}
TypedMessageBuilder typedMessageBuilder;
@ -158,12 +160,15 @@ public class PulsarProducerOp implements PulsarOp {
}
}
catch (PulsarClientException | ExecutionException | InterruptedException pce) {
logger.trace(
String errMsg =
"Sync message sending failed: " +
"key - " + msgKey + "; " +
"properties - " + msgProperties + "; " +
"payload - " + msgPayload);
throw new RuntimeException(pce);
"payload - " + msgPayload;
logger.trace(errMsg);
throw new PulsarDriverUnexpectedException(errMsg);
}
timeTracker.run();
@ -219,7 +224,7 @@ public class PulsarProducerOp implements PulsarOp {
});
}
catch (Exception e) {
throw new RuntimeException(e);
throw new PulsarDriverUnexpectedException(e);
}
}
}

View File

@ -1,6 +1,8 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.*;
import io.nosqlbench.driver.pulsar.exception.PulsarDriverParamException;
import io.nosqlbench.driver.pulsar.exception.PulsarDriverUnsupportedOpException;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
@ -41,13 +43,13 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
this.cmdTpl = new CommandTemplate(opTemplate);
if (cmdTpl.isDynamic("op_scope")) {
throw new RuntimeException("op_scope must be static");
throw new PulsarDriverParamException("\"op_scope\" parameter must be static");
}
// TODO: At the moment, only supports static "client"
if (cmdTpl.containsKey("client")) {
if (cmdTpl.isDynamic("client")) {
throw new RuntimeException("\"client\" can't be made dynamic!");
throw new PulsarDriverParamException("\"client\" parameter can't be made dynamic!");
} else {
String client_name = cmdTpl.getStatic("client");
this.clientSpace = pcache.getPulsarSpace(client_name);
@ -67,12 +69,12 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
private LongFunction<PulsarOp> resolve() {
if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) {
throw new RuntimeException("Statement parameter \"optype\" must be static and have a valid value!");
throw new PulsarDriverParamException("[resolve()] \"optype\" parameter must be static and have a valid value!");
}
String stmtOpType = cmdTpl.getStatic("optype");
if (cmdTpl.containsKey("topic_url")) {
throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?");
throw new PulsarDriverParamException("[resolve()] \"topic_url\" parameter is not valid. Perhaps you mean \"topic_uri\"?");
}
// Doc-level parameter: topic_uri
@ -93,7 +95,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label));
asyncApiFunc = (l) -> value;
} else {
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label + "\" parameter cannot be dynamic!");
throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label + "\" parameter cannot be dynamic!");
}
}
logger.info("async_api: {}", asyncApiFunc.apply(0));
@ -105,7 +107,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label));
useTransactionFunc = (l) -> value;
} else {
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label + "\" parameter cannot be dynamic!");
throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label + "\" parameter cannot be dynamic!");
}
}
logger.info("use_transaction: {}", useTransactionFunc.apply(0));
@ -116,7 +118,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label))
adminDelOpFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label));
else
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label + "\" parameter cannot be dynamic!");
throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label + "\" parameter cannot be dynamic!");
}
logger.info("admin_delop: {}", adminDelOpFunc.apply(0));
@ -126,7 +128,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label))
seqTrackingFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label));
else
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label + "\" parameter cannot be dynamic!");
throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label + "\" parameter cannot be dynamic!");
}
logger.info("seq_tracking: {}", seqTrackingFunc.apply(0));
@ -136,7 +138,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label))
brokerMsgDedupFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label));
else
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label + "\" parameter cannot be dynamic!");
throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label + "\" parameter cannot be dynamic!");
}
logger.info("msg_dedup_broker: {}", seqTrackingFunc.apply(0));
@ -212,7 +214,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
}
// Invalid operation type
else {
throw new RuntimeException("Unsupported Pulsar operation type");
throw new PulsarDriverUnsupportedOpException();
}
}
@ -224,7 +226,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
{
if ( cmdTpl.isDynamic("admin_roles") ||
cmdTpl.isDynamic("allowed_clusters") ) {
throw new RuntimeException("\"admin_roles\" or \"allowed_clusters\" parameter must NOT be dynamic!");
throw new PulsarDriverParamException("\"admin_roles\" or \"allowed_clusters\" parameter must NOT be dynamic!");
}
LongFunction<Set<String>> adminRolesFunc;
@ -357,7 +359,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
if (cmdTpl.isStatic("seqerr_simu")) {
seqErrSimuTypeFunc = (l) -> cmdTpl.getStatic("seqerr_simu");
} else {
throw new RuntimeException("\"seqerr_simu\" parameter cannot be dynamic!");
throw new PulsarDriverParamException("[resolveMsgSend()] \"seqerr_simu\" parameter cannot be dynamic!");
}
}
@ -391,7 +393,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
valueFunc = (l) -> null;
}
} else {
throw new RuntimeException("Producer:: \"msg_value\" field must be specified!");
throw new PulsarDriverParamException("[resolveMsgSend()] \"msg_value\" field must be specified!");
}
return new PulsarProducerMapper(
@ -682,7 +684,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
valueFunc = (l) -> null;
}
} else {
throw new RuntimeException("Batch Producer:: \"msg_value\" field must be specified!");
throw new PulsarDriverParamException("[resolveMsgBatchSend()] \"msg_value\" field must be specified!");
}
return new PulsarBatchProducerMapper(