Merge pull request #283 from eolivelli/fix/pulsar-2

Enhancements on Pulsar Driver
This commit is contained in:
Jonathan Shook 2021-03-17 12:03:39 -05:00 committed by GitHub
commit 931172d0f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 5 deletions

View File

@ -3,6 +3,7 @@ package io.nosqlbench.driver.pulsar;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.ops.PulsarOp;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -14,10 +15,12 @@ public class PulsarAction implements SyncAction {
private final int slot;
private final PulsarActivity activity;
int maxTries = 1;
public PulsarAction(PulsarActivity activity, int slot) {
this.activity = activity;
this.slot = slot;
this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10);
}
@Override
@ -26,6 +29,7 @@ public class PulsarAction implements SyncAction {
@Override
public int runCycle(long cycle) {
long start = System.nanoTime();
PulsarOp pulsarOp;
try (Timer.Context ctx = activity.getBindTimer().time()) {
@ -33,17 +37,26 @@ public class PulsarAction implements SyncAction {
pulsarOp = readyPulsarOp.apply(cycle);
} catch (Exception bindException) {
// if diagnostic mode ...
activity.getErrorhandler().handleError(bindException, cycle, 0);
throw new RuntimeException(
"while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException
);
}
try (Timer.Context ctx = activity.getExecuteTimer().time()) {
pulsarOp.run();
for (int i = 0; i < maxTries; i++) {
try (Timer.Context ctx = activity.getExecuteTimer().time()) {
pulsarOp.run();
break;
} catch (RuntimeException err) {
ErrorDetail errorDetail = activity
.getErrorhandler()
.handleError(err, cycle, System.nanoTime() - start);
if (!errorDetail.isRetryable()) {
break;
}
}
}
// TODO: add retries and use standard error handler
return 0;
}
}

View File

@ -59,6 +59,10 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
);
}
public NBErrorHandler getErrorhandler() {
return errorhandler;
}
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);

View File

@ -11,6 +11,7 @@ import org.apache.pulsar.client.impl.DefaultBatcherBuilder;
import org.apache.pulsar.client.impl.ProducerImpl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -323,7 +324,8 @@ public class PulsarSpace {
PulsarClient pulsarClient = getPulsarClient();
// Get other possible producer settings that are set at global level
Map<String, Object> consumerConf = pulsarNBClientConf.getConsumerConfMap();
Map<String, Object> consumerConf = new HashMap<>(pulsarNBClientConf.getConsumerConfMap());
consumerConf.remove("timeout");
// Explicit topic names will take precedence over topics pattern
if (!topicNames.isEmpty()) {