From 8aff79cba41e4277425f3e35871a8be008c0ce16 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 17 Mar 2021 15:24:51 +0100 Subject: [PATCH 1/3] Handle correctly Pulsar Consumer configuration --- .../main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java index ee58f6848..8dea929bc 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java @@ -11,6 +11,7 @@ import org.apache.pulsar.client.impl.DefaultBatcherBuilder; import org.apache.pulsar.client.impl.ProducerImpl; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -323,7 +324,8 @@ public class PulsarSpace { PulsarClient pulsarClient = getPulsarClient(); // Get other possible producer settings that are set at global level - Map consumerConf = pulsarNBClientConf.getConsumerConfMap(); + Map consumerConf = new HashMap<>(pulsarNBClientConf.getConsumerConfMap()); + consumerConf.remove("timeout"); // Explicit topic names will take precedence over topics pattern if (!topicNames.isEmpty()) { From abbea4df7175b62e6155cecb1412b34380e8537a Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 17 Mar 2021 17:49:32 +0100 Subject: [PATCH 2/3] Add ErrorHandler --- .../driver/pulsar/PulsarAction.java | 20 +++++++++++++++---- .../driver/pulsar/PulsarActivity.java | 4 ++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java index e7da412fa..853a70ad7 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java @@ -3,6 +3,7 @@ package io.nosqlbench.driver.pulsar; import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.ops.PulsarOp; import io.nosqlbench.engine.api.activityapi.core.SyncAction; +import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -12,6 +13,7 @@ public class PulsarAction implements SyncAction { private final static Logger logger = LogManager.getLogger(PulsarAction.class); + private static final int MAX_TRIALS = 100; private final int slot; private final PulsarActivity activity; @@ -26,6 +28,7 @@ public class PulsarAction implements SyncAction { @Override public int runCycle(long cycle) { + long start = System.nanoTime(); PulsarOp pulsarOp; try (Timer.Context ctx = activity.getBindTimer().time()) { @@ -33,17 +36,26 @@ public class PulsarAction implements SyncAction { pulsarOp = readyPulsarOp.apply(cycle); } catch (Exception bindException) { // if diagnostic mode ... + activity.getErrorhandler().handleError(bindException, cycle, 0); throw new RuntimeException( "while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException ); } - try (Timer.Context ctx = activity.getExecuteTimer().time()) { - pulsarOp.run(); + for (int i = 0; i < MAX_TRIALS; i++) { + try (Timer.Context ctx = activity.getExecuteTimer().time()) { + pulsarOp.run(); + break; + } catch (RuntimeException err) { + ErrorDetail errorDetail = activity + .getErrorhandler() + .handleError(err, cycle, System.nanoTime() - start); + if (!errorDetail.isRetryable()) { + break; + } + } } - // TODO: add retries and use standard error handler - return 0; } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index fda1f16cd..c0ff8fe7e 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -59,6 +59,10 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve ); } + public NBErrorHandler getErrorhandler() { + return errorhandler; + } + @Override public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); From 84a9a4bf834cefcf0b0bc407050788f5944c37b4 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 17 Mar 2021 18:01:30 +0100 Subject: [PATCH 3/3] make tries configurable as in HttpAction --- .../main/java/io/nosqlbench/driver/pulsar/PulsarAction.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java index 853a70ad7..f5f64609a 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java @@ -13,13 +13,14 @@ public class PulsarAction implements SyncAction { private final static Logger logger = LogManager.getLogger(PulsarAction.class); - private static final int MAX_TRIALS = 100; private final int slot; private final PulsarActivity activity; + int maxTries = 1; public PulsarAction(PulsarActivity activity, int slot) { this.activity = activity; this.slot = slot; + this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10); } @Override @@ -42,7 +43,7 @@ public class PulsarAction implements SyncAction { ); } - for (int i = 0; i < MAX_TRIALS; i++) { + for (int i = 0; i < maxTries; i++) { try (Timer.Context ctx = activity.getExecuteTimer().time()) { pulsarOp.run(); break;